[GitHub] spark pull request #21884: [SPARK-24960][K8S] explicitly expose ports on dri...

2018-08-01 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21884#discussion_r206999720
  
--- Diff: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 ---
@@ -203,4 +212,12 @@ class BasicDriverFeatureStepSuite extends 
SparkFunSuite {
   "spark.files" -> 
"https://localhost:9000/file1.txt,/opt/spark/file2.txt;)
 assert(additionalProperties === expectedSparkConf)
   }
+
+  def containerPort(name: String, portNumber: Int): ContainerPort = {
+val port = new ContainerPort()
--- End diff --

Use `ContainerPortBuilder`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207016169
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
--- End diff --

Yup, on my end this is a low conviction suggestion - we might start feeling 
pain around this as we add more metric types, but for a first pass this is 
probably fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207073149
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,34 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics,
+// while reading from the log. SparkListenerStageExecutorMetrics are 
only processed
+// when reading logs.
+liveExecutors.get(executorMetrics.execId)
+  .orElse(deadExecutors.get(executorMetrics.execId)) match {
+  case Some(exec) =>
--- End diff --

Ok, this can stay as is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21923: [SPARK-24918][Core] Executor Plugin api

2018-08-01 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21923#discussion_r207072577
  
--- Diff: core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java 
---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * A plugin which can be automaticaly instantiated within each Spark 
executor.  Users can specify
+ * plugins which should be created with the "spark.executor.plugins" 
configuration.  An instance
+ * of each plugin will be created for every executor, including those 
created by dynamic allocation,
+ * before the executor starts running any tasks.
+ *
+ * The specific api exposed to the end users still considered to be very 
unstable.  If implementors
+ * extend this base class, we will *hopefully* be able to keep 
compatability by providing dummy
+ * implementations for any methods added, but make no guarantees this will 
always be possible across
+ * all spark releases.
+ *
+ * Spark does nothing to verify the plugin is doing legitimate things, or 
to manage the resources
+ * it uses.  A plugin acquires the same privileges as the user running the 
task.  A bad plugin
+ * could also intefere with task execution and make the executor fail in 
unexpected ways.
+ */
+@DeveloperApi
+public class AbstractExecutorPlugin {
--- End diff --

`interface`? A bit more flexibility in terms of the user's desired class 
hierarchy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207004997
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
--- End diff --

Yup that's fine - I did some googling, unfortunately there isn't a great 
way to iterate over fields of a case class. You could create a thin wrapper 
object around the array instead though, if we really think the nicer API is 
worthwhile:

```
case class Metrics(values: Seq[Long]) {
  def someMetric1(): Long = values(0)
  def 
  def ...
}
```

Or even this:

```
case class Metrics(metric1: Long, metric2: Long, metfic3: Long, ...) {
  def values(): Seq[Long] = Seq(metric1, metric2, metric3, ...)
}
```

The latter which would be better because you'd be guaranteed to create the 
struct with the right number of metrics. Though such abstractions are not 
necessary by any means.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21884: [SPARK-24960][K8S] explicitly expose ports on driver con...

2018-08-01 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21884
  
Thanks @shaneknapp, I'll merge this now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21884: [SPARK-24960][K8S] explicitly expose ports on driver con...

2018-08-01 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21884
  
Thanks @adelbertc for the contribution!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-01 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r207002859
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,34 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
+
+// check if there is a new peak value for any of the executor level 
memory metrics,
+// while reading from the log. SparkListenerStageExecutorMetrics are 
only processed
+// when reading logs.
+liveExecutors.get(executorMetrics.execId)
+  .orElse(deadExecutors.get(executorMetrics.execId)) match {
+  case Some(exec) =>
--- End diff --

From the 
[Scaladoc](https://www.scala-lang.org/api/2.10.2/index.html#scala.Option):

> The most idiomatic way to use an scala.Option instance is to treat it as 
a collection or monad and use map,flatMap, filter, or foreach

It's probably better to follow the Scala conventions here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.

2018-07-27 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21748#discussion_r205849187
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import org.scalatest.concurrent.Eventually
+import scala.collection.JavaConverters._
+
+import 
org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, 
INTERVAL, TIMEOUT}
+
+trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
+
+  test("Run in client mode.", k8sTestTag) {
+val labels = Map("spark-app-selector" -> driverPodName)
+val driverPort = 7077
+val blockManagerPort = 1
+val driverService = testBackend
+  .getKubernetesClient
+  .services()
+  .inNamespace(kubernetesTestComponents.namespace)
+  .createNew()
+.withNewMetadata()
+  .withName(s"$driverPodName-svc")
+  .endMetadata()
+.withNewSpec()
+  .withClusterIP("None")
+  .withSelector(labels.asJava)
+  .addNewPort()
+.withName("driver-port")
+.withPort(driverPort)
+.withNewTargetPort(driverPort)
+.endPort()
+  .addNewPort()
+.withName("block-manager")
+.withPort(blockManagerPort)
+.withNewTargetPort(blockManagerPort)
+.endPort()
+  .endSpec()
+.done()
+try {
+  val driverPod = testBackend
+.getKubernetesClient
+.pods()
+.inNamespace(kubernetesTestComponents.namespace)
+.createNew()
+  .withNewMetadata()
+  .withName(driverPodName)
+  .withLabels(labels.asJava)
+  .endMetadata()
+.withNewSpec()
+  .withServiceAccountName("default")
--- End diff --

Yup we can fix this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21900: [SPARK-23146][K8S][TESTS] Don't set service account name...

2018-07-27 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21900
  
@skonto can you try this with spark-rbacyaml?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21900: [SPARK-23146][K8S][TESTS] Don't set service accou...

2018-07-27 Thread mccheah
GitHub user mccheah opened a pull request:

https://github.com/apache/spark/pull/21900

[SPARK-23146][K8S][TESTS] Don't set service account name for client mode 
test

## What changes were proposed in this pull request?

Don't set service account name for the pod created in client mode

## How was this patch tested?

Test should continue running smoothly in Jenkins.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/palantir/spark 
fix-integration-test-service-account

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21900.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21900






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21900: [SPARK-23146][K8S][TESTS] Don't set service account name...

2018-07-27 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21900
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21884: [Scheduler][K8s]: explicitly expose ports on driver cont...

2018-07-27 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21884
  
This needs a Spark ticket. I also don't think pods specifically have to 
expose ports. My understanding was that the ports field on the pod was only to 
assign ports names, so that the ports can be referenced by name and not by 
number on service objects.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209712159
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager(
 onHeapStorageMemoryPool.memoryUsed + 
offHeapStorageMemoryPool.memoryUsed
   }
 
+  /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = 
onHeapExecutionMemoryPool.memoryUsed
--- End diff --

It probably should be if only because the variable is annotated with 
`@GuardedBy(this)`, so it makes the code more consistent to mark this as 
synchronized.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209717523
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,31 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
--- End diff --

Should we use a `Clock` instance here for testing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209714883
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+
+  // the first element is initialized to -1, indicating that the values 
for the array
+  // haven't been set yet.
+  metrics(0) = -1
+
+  /** Returns the value for the specified metricType. */
+  def getMetricValue(metricType: ExecutorMetricType): Long = {
+metrics(ExecutorMetricType.metricIdxMap(metricType))
+  }
+
+  /** Returns true if the values for the metrics have been set, false 
otherwise. */
+  def isSet(): Boolean = metrics(0) > -1
+
+  private[spark] def this(metrics: Array[Long]) {
+this()
+Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, 
this.metrics.size))
+  }
+
+  /**
+   * Constructor: create the ExecutorMetrics with the values specified.
+   *
+   * @param executorMetrics map of executor metric name to value
+   */
+  private[spark] def this(executorMetrics: Map[String, Long]) {
+this()
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+  metrics(idx) = 
executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+}
+  }
+
+  /**
+   * Compare the specified executor metrics values with the current 
executor metric values,
+   * and update the value for any metrics where the new value for the 
metric is larger.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  private[spark] def compareAndUpdatePeakValues(executorMetrics: 
ExecutorMetrics): Boolean = {
+var updated: Boolean = false
--- End diff --

No need to specifically label this as `Boolean`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209716819
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,31 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
--- End diff --

Does the type need to be explicitly defined here and in the next line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209714796
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+
+  // the first element is initialized to -1, indicating that the values 
for the array
+  // haven't been set yet.
+  metrics(0) = -1
+
+  /** Returns the value for the specified metricType. */
+  def getMetricValue(metricType: ExecutorMetricType): Long = {
+metrics(ExecutorMetricType.metricIdxMap(metricType))
+  }
+
+  /** Returns true if the values for the metrics have been set, false 
otherwise. */
+  def isSet(): Boolean = metrics(0) > -1
+
+  private[spark] def this(metrics: Array[Long]) {
+this()
+Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, 
this.metrics.size))
+  }
+
+  /**
+   * Constructor: create the ExecutorMetrics with the values specified.
+   *
+   * @param executorMetrics map of executor metric name to value
+   */
+  private[spark] def this(executorMetrics: Map[String, Long]) {
+this()
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+  metrics(idx) = 
executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+}
+  }
+
+  /**
+   * Compare the specified executor metrics values with the current 
executor metric values,
+   * and update the value for any metrics where the new value for the 
metric is larger.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  private[spark] def compareAndUpdatePeakValues(executorMetrics: 
ExecutorMetrics): Boolean = {
+var updated: Boolean = false
+
+(0 until ExecutorMetricType.values.length).foreach { idx =>
+   if ( executorMetrics.metrics(idx) > metrics(idx)) {
--- End diff --

Nit: No space after the left bracket.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209715208
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in 
ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
+  private[spark] val name = 
getClass().getName().stripSuffix("$").split("""\.""").last
+}
+
+private[spark] abstract class MemoryManagerExecutorMetricType(
+f: MemoryManager => Long) extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: 
MemoryManager): Long = {
+f(memoryManager)
+  }
+}
+
+private[spark]abstract class MBeanExecutorMetricType(mBeanName: String)
--- End diff --

Put a space after `[spark]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209715001
  
--- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
---
@@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager(
 onHeapStorageMemoryPool.memoryUsed + 
offHeapStorageMemoryPool.memoryUsed
   }
 
+  /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = 
onHeapExecutionMemoryPool.memoryUsed
+
+  /**
+   *  Off heap execution memory currently in use, in bytes.
+   */
+  final def offHeapExecutionMemoryUsed: Long = 
offHeapExecutionMemoryPool.memoryUsed
--- End diff --

`synchronized` here also and in the below two methods.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209711557
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as 
part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
--- End diff --

Unclear - if we expose these metrics to some external consumer via an API 
for example, then we almost certainly want to have a schema labelling these 
fields for consumption by e.g. dashboards. I think what we have here is fine 
for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21923: [SPARK-24918][Core] Executor Plugin api

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21923#discussion_r209805442
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -130,6 +130,12 @@ private[spark] class Executor(
   private val urlClassLoader = createClassLoader()
   private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
 
+  Thread.currentThread().setContextClassLoader(replClassLoader)
+  conf.get(EXECUTOR_PLUGINS).foreach { classes =>
+Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf)
--- End diff --

For all cluster managers would this properly load plugins deployed via 
`--jars` in spark-submit or `spark.jars` in the SparkConf? I know that jar 
deployment and when they're available on the classpath may sometimes vary. 
Although worst case this seems like the kind of thing one may prefer to put in 
`spark.executor.extraClassPath` simply because those jars are guaranteed to be 
loaded at JVM boot time.

In fact - I wonder if we should even move this extension loading further up 
in the lifecycle, simply so that the plugin can be around for a larger 
percentage of the executor JVM's uptime.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-08-13 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r209773404
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -669,6 +686,31 @@ private[spark] class AppStatusListener(
 }
   }
 }
+
+// check if there is a new peak value for any of the executor level 
memory metrics
+// for the live UI. SparkListenerExecutorMetricsUpdate events are only 
processed
+// for the live UI.
+event.executorUpdates.foreach { updates: ExecutorMetrics =>
+  liveExecutors.get(event.execId).foreach { exec: LiveExecutor =>
+if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+  maybeUpdate(exec, now)
+}
+  }
+}
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: 
SparkListenerStageExecutorMetrics): Unit = {
+val now = System.nanoTime()
--- End diff --

Yup that's fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-08-14 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21221
  
@edwinalu I think this is almost ready, can you fix merge conflicts? 
@squito @felixcheung any other comments?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s

2018-08-16 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21584
  
Let's merge at the end of the day pacific time (~5PM-ish) on Friday, August 
17, pending any additional feedback on the mailing list thread discussing the 
subject of including this in 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22146: [WIP][SPARK-24434][K8S] pod template files

2018-08-20 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/22146
  
@skonto @erikerlandson 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-08-20 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21221
  
We're going to delay on merging this until after the 2.4 branch is cut. We 
can include this in Spark 2.5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files

2018-08-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r211802430
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/TemplateVolumeStep.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{Config => _, _}
+
+import org.apache.spark.deploy.k8s._
+
+private[spark] class TemplateVolumeStep(
--- End diff --

So this pushes the pod spec yml from the spark-submit process's local disk 
up to the driver pod. It may be worthwhile to support specifying the file as a 
location in the driver pod that hasn't been mounted by spark-submit, but I 
think doing it this way is fine for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files

2018-08-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r211800379
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 ---
@@ -96,3 +112,25 @@ private[spark] class KubernetesDriverBuilder(
 spec
   }
 }
+
+private[spark] object KubernetesDriverBuilder extends Logging {
+  def apply(kubernetesClient: KubernetesClient, conf: SparkConf): 
KubernetesDriverBuilder = {
+conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
+  .map(new File(_))
+  .map(file => new KubernetesDriverBuilder(provideInitialSpec = conf 
=> {
+try {
+  val sparkPod = KubernetesUtils.loadPodFromTemplate(
+kubernetesClient,
+file,
+Constants.DRIVER_CONTAINER_NAME)
--- End diff --

You may `import org.apache.spark.deploy.k8s.Constants._` at the top of the 
file and then not need the `Constants` prefix here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files

2018-08-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r211800127
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/TemplateVolumeStep.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{Config => _, _}
+
+import org.apache.spark.deploy.k8s._
+
+private[spark] class TemplateVolumeStep(
+   conf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+  extends KubernetesFeatureConfigStep {
+  def configurePod(pod: SparkPod): SparkPod = {
+
require(conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
+val podTemplateFile = 
conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
+val podWithVolume = new PodBuilder(pod.pod)
+  .editSpec()
--- End diff --

Match the indentation here with the indentation style down below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files

2018-08-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r211800821
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 ---
@@ -51,7 +57,13 @@ private[spark] class KubernetesDriverBuilder(
 provideJavaStep: (
   KubernetesConf[KubernetesDriverSpecificConf]
 => JavaDriverFeatureStep) =
-new JavaDriverFeatureStep(_)) {
+new JavaDriverFeatureStep(_),
+provideTemplateVolumeStep: (KubernetesConf[_ <: 
KubernetesRoleSpecificConf]
+  => TemplateVolumeStep) =
+new TemplateVolumeStep(_),
+provideInitialSpec: KubernetesConf[KubernetesDriverSpecificConf]
--- End diff --

Why not `provideInitialPod` to be consistent with the executor builder?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files

2018-08-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r211799242
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 ---
@@ -59,5 +65,23 @@ private[spark] object KubernetesUtils {
 }
   }
 
+  def loadPodFromTemplate(kubernetesClient: KubernetesClient,
+  templateFile: File,
+  containerName: String): SparkPod = {
+try {
+  val pod = kubernetesClient.pods().load(templateFile).get()
+  val container = pod.getSpec.getContainers.asScala
+.filter(_.getName == containerName)
--- End diff --

Can use `require(...exists)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files

2018-08-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r211801962
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/TemplateVolumeStep.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{Config => _, _}
+
+import org.apache.spark.deploy.k8s._
+
+private[spark] class TemplateVolumeStep(
+   conf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+  extends KubernetesFeatureConfigStep {
+  def configurePod(pod: SparkPod): SparkPod = {
+
require(conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
+val podTemplateFile = 
conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
+val podWithVolume = new PodBuilder(pod.pod)
+  .editSpec()
+  .addNewVolume()
+  .withName(Constants.POD_TEMPLATE_VOLUME)
+  .withHostPath(new HostPathVolumeSource(podTemplateFile))
--- End diff --

`hostPath` is not the correct volume type here. Instead, do the following:

- Override `getAdditionalKubernetesResources()` with the following:
1. Load the contents of the template file from this process's local 
disk into a UTF-8 String
2. Create and return a  `ConfigMap` object containing the contents of 
that config map, with some given key
- In `configurePod`, add the config map as a volume in the pod spec, and 
add a volume mount pointing to that volume as done here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files

2018-08-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r211800415
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 ---
@@ -16,11 +16,17 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, 
KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
-import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, 
DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, 
EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, 
MountVolumesFeatureStep}
+import java.io.File
+
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.features._
--- End diff --

Undo these import changes. Keep the ordering correct, but import each class 
individually.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files

2018-08-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r211800326
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 ---
@@ -96,3 +112,25 @@ private[spark] class KubernetesDriverBuilder(
 spec
   }
 }
+
+private[spark] object KubernetesDriverBuilder extends Logging {
+  def apply(kubernetesClient: KubernetesClient, conf: SparkConf): 
KubernetesDriverBuilder = {
+conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
+  .map(new File(_))
+  .map(file => new KubernetesDriverBuilder(provideInitialSpec = conf 
=> {
+try {
+  val sparkPod = KubernetesUtils.loadPodFromTemplate(
+kubernetesClient,
+file,
+Constants.DRIVER_CONTAINER_NAME)
--- End diff --

Unclear if these container names should be configurable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files

2018-08-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r211800176
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/TemplateVolumeStep.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{Config => _, _}
+
+import org.apache.spark.deploy.k8s._
+
+private[spark] class TemplateVolumeStep(
+   conf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+  extends KubernetesFeatureConfigStep {
+  def configurePod(pod: SparkPod): SparkPod = {
+
require(conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
+val podTemplateFile = 
conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
+val podWithVolume = new PodBuilder(pod.pod)
+  .editSpec()
+  .addNewVolume()
+  .withName(Constants.POD_TEMPLATE_VOLUME)
+  .withHostPath(new HostPathVolumeSource(podTemplateFile))
+  .endVolume()
+  .endSpec()
+  .build()
+
+val containerWithVolume = new ContainerBuilder(pod.container)
+.withVolumeMounts(new VolumeMountBuilder()
--- End diff --

`addNewVolumeMount()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files

2018-08-22 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r212126595
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 ---
@@ -51,7 +57,13 @@ private[spark] class KubernetesDriverBuilder(
 provideJavaStep: (
   KubernetesConf[KubernetesDriverSpecificConf]
 => JavaDriverFeatureStep) =
-new JavaDriverFeatureStep(_)) {
+new JavaDriverFeatureStep(_),
+provideTemplateVolumeStep: (KubernetesConf[_ <: 
KubernetesRoleSpecificConf]
+  => TemplateVolumeStep) =
+new TemplateVolumeStep(_),
+provideInitialSpec: KubernetesConf[KubernetesDriverSpecificConf]
--- End diff --

You can make the initial pod and wrap it in the `KubernetesDriverSpec` 
object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files

2018-08-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r212396719
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.{Config => _, _}
--- End diff --

Instead of doing this, just import `Config`, then, do this import: `import 
org.apache.spark.deploy.k8s.Config._`. Also, wildcard-import constants: `import 
org.apache.spark.deploy.k8s.Constants._`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files

2018-08-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r212396608
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.{Config => _, _}
+
+import org.apache.spark.deploy.k8s._
--- End diff --

Do not wildcard import the package.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files

2018-08-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r212400281
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 ---
@@ -81,13 +88,17 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
   .build[java.lang.Long, java.lang.Long]()
 val executorPodsLifecycleEventHandler = new 
ExecutorPodsLifecycleManager(
   sc.conf,
-  new KubernetesExecutorBuilder(),
+  KubernetesExecutorBuilder(kubernetesClient, sc.conf),
--- End diff --

I double checked and I don't think we use this variable inside 
`ExecutorPodsLifecycleManager`, can you remove it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files

2018-08-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r212395959
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -225,6 +225,18 @@ private[spark] object Config extends Logging {
 "Ensure that major Python version is either Python2 or Python3")
   .createWithDefault("2")
 
+  val KUBERNETES_DRIVER_CONTAINER_NAME =
+ConfigBuilder("spark.kubernetes.driver.containerName")
--- End diff --

I prefer to make it explicit, rather than e.g. "pick the first container in 
the list".


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files

2018-08-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r212396387
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 ---
@@ -59,5 +65,21 @@ private[spark] object KubernetesUtils {
 }
   }
 
+  def loadPodFromTemplate(kubernetesClient: KubernetesClient,
--- End diff --

IDEs don't handle this indentation properly I think - you want this:

```
def loadPodFromTemplate(
kubernetesClient: KubernetesClient,
templateFile: File,
containerName: String): SparkPod = {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files

2018-08-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r212448189
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -225,6 +225,18 @@ private[spark] object Config extends Logging {
 "Ensure that major Python version is either Python2 or Python3")
   .createWithDefault("2")
 
+  val KUBERNETES_DRIVER_CONTAINER_NAME =
+ConfigBuilder("spark.kubernetes.driver.containerName")
--- End diff --

This feature should support using multiple containers, in which case the 
user needs to specify which container is running the Spark process. Using a 
configuration option for that seems like the most straightforward solution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files

2018-08-24 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r212722242
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -225,6 +225,18 @@ private[spark] object Config extends Logging {
 "Ensure that major Python version is either Python2 or Python3")
   .createWithDefault("2")
 
+  val KUBERNETES_DRIVER_CONTAINER_NAME =
+ConfigBuilder("spark.kubernetes.driver.containerName")
--- End diff --

Is there confusion because there's an _existing_ configuration option also? 
I think the existing configuration option sets the driver container name when 
no yml is specified. But perhaps the interpretation of this configuration value 
should change when the pod template is provided.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s

2018-08-17 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21584
  
I filed https://issues.apache.org/jira/browse/SPARK-25152 for the 
integration tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s

2018-08-17 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21584
  
Ok I am merging this to master now. Thanks for the work on this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22146: [SPARK-24434][K8S] pod template files

2018-08-22 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/22146
  
@yifeih can we also modify `docs/running-on-kubernetes.md` and have a 
section specific to using this feature? Mark it as experimental also.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22146: [SPARK-24434][K8S] pod template files

2018-08-22 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/22146
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
> Yes, the service gets its endpoints by matching its label selector 
against labels on the pods so it's critical to have matching labels. Another 
tenable solution is for the driver backend code to get the pod object of the 
driver and sets the label selector of the service based on the labels on the 
driver pod.

The problem is that the driver's labels might not be unique to that driver, 
which therefore would require the user to assign their own unique labels or for 
us to patch the driver pod in-place to assign it a unique label. Neither seem 
ideal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
I think taking a step back, it seems unwise more so to be making any 
assumptions about the location in which a driver is running in client mode. 
Client mode is simply just saying that the application driver is running 
"locally", wherever locally is. That is why I suggested also to remove the 
driver's knowledge of the driver pod name and to remove the owner reference 
concept entirely. As soon as we start to make assumptions in one place, it may 
(though not necessarily) encourage us to rely on those assumptions to create 
more forks and complexities in the code in other places. That's a precedent we 
likely do not want to set.

> Yes, and that's what my point above is about. Regardless of how the 
driver pod is created and managed, users need to make sure it contains a label 
that is unique for the pod/service combination to work.

In one case the requirement for the unique label is explicit - the user has 
created their service and their pod manually and have assigned the labels 
accordingly. In another case, the user must know to assign the unique label but 
they would only know to do so from having read the documentation on deploying 
Spark. Explicit requirements that are well defined by the API are preferable to 
implicit requirements.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-20 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
Anyone know what's happening with this:

```
[error] 
/home/jenkins/workspace/testing-k8s-prb-make-spark-distribution-unified/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala:39:
 Symbol 'term org.scalacheck' is missing from the classpath.
[error] This symbol is required by 'method 
org.scalatest.prop.Configuration.getParams'.
[error] Make sure that term scalacheck is in your classpath and check for 
conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'Configuration.class' was compiled 
against an incompatible version of org.
[error] serializer.deserialize[Any](serialized) match {
[error]^
[error] one error found
[error] Compile failed at Jul 20, 2018 12:33:14 PM [1.370s]
```

@shaneknapp 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-20 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
We discussed this offline. After some experimentation, we concluded that 
it's not actually straightforward to set up the headless service in the 
Kubernetes scheduler code in client mode, which would be where we'd have to put 
this code. The problem is that before the scheduler starts up, the driver needs 
to bind to the host given by `spark.driver.host`. But if that hostname is tied 
to a headless service and the headless service does not exist, the hostname 
bind will fail.

There's a few ways to work around this, but all of them seem risky to put 
in a first pass at this patch.

Additionally, it's not clear if the scheduler code should be opinionated 
about configuring network connectivity for the driver. Client mode is just a 
definition of the driver running in a local process - Spark currently doesn't 
make any assumptions about what that local process is, whether it's in a 
Kubernetes pod or not. Contrast this with cluster mode where the driver should 
know it's running in a pod as submitted by `KubernetesClientApplication`, and 
`KubernetesClientApplication` can know to set up its headless service.

Therefore we are not going to have the driver set up the headless service 
in this patch. If we decide later on that creating the headless service is the 
right thing to do, we can introduce that functionality in a separate patch.

I'm going to update the docs and hope to merge this by the end of the day 
on Monday, July 23. Let us know if there are any additional concerns. Thanks 
@liyinan926 @echarles.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-20 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-20 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
Never mind, think it's recovering now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-20 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
@liyinan926 did some of my own edits on top of your suggestions for docs 
wording on the latest patch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21800: [SPARK-24825][K8S][TEST] Kubernetes integration t...

2018-07-17 Thread mccheah
GitHub user mccheah opened a pull request:

https://github.com/apache/spark/pull/21800

[SPARK-24825][K8S][TEST] Kubernetes integration tests build the whole 
reactor

## What changes were proposed in this pull request?

Make the integration test script build all modules.

In order to not run all the non-Kubernetes integration tests in the build, 
support specifying tags and tag all integration tests specifically with "k8s". 
Supply the k8s tag in the dev/dev-run-integration-tests.sh script.

## How was this patch tested?

The build system will test this.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/palantir/spark k8s-integration-tests-maven-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21800.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21800


commit 6a89c658c6a56c34d5e0578f09db77b4ed9b41f3
Author: mcheah 
Date:   2018-07-17T23:58:36Z

[SPARK-24825][K8S][TEST] Kubernetes integration tests build the whole 
reactor.

In order to not run all the non-Kubernetes integration tests in the build, 
support specifying tags and tag all integration tests specifically with "k8s". 
Supply the k8s tag in the dev/dev-run-integration-tests.sh script.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21800: [SPARK-24825][K8S][TEST] Kubernetes integration tests bu...

2018-07-17 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21800
  
test this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21800: [SPARK-24825][K8S][TEST] Kubernetes integration tests bu...

2018-07-17 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21800
  
Addressed comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
@echarles it's highly likely your pods cannot communicate to your driver's 
host because of some network partition or a firewall issue. The specific steps 
you would have to take to fix that issue will depend in large part on how your 
hardware is set up. I don't know how much specific guidance I can give without 
knowing the specifics of your networking layout, but using some network 
diagnostics tooling would help. I would suggest for example trying to run a 
bash pod and exec into it, and to ping the host your driver would be running on 
- run some instrumentation and go from there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
> agreed with @echarles that it would be great if the submission client 
will still create a headless service for the driver if the driver is running in 
a pod in client mode.

The goal of this approach that specifically does not create a headless 
service is so that the client mode implementation here is identical to the 
client mode implementation of the other cluster managers. That is to say, the 
other cluster managers for client mode don't make any assumptions about 
connectivity, nor do they set up their own networking layers. Therefore I think 
it's appropriate here to avoid the extra complexity.

> my pod has access to my host, so there is nothing to do on network level. 
In other words, which are the steps to make this PR work in client mode for 
Out-Cluster (assuming the network connectivity is OK)?

Can you share on how you know that your executor pod has access to your 
host set by `spark.driver.host` and `spark.driver.port` over the network?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
> Can you point in the fork where the submission client is create the 
headless service? (just to help me understand the internals)

> Btw If we stick to this manual approach, the need for the manual headless 
service should be documented.

@echarles we create the headless service in spark-submit as part of 
spark-submit only for cluster mode: 
https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala#L69

Note that we only invoke any of the feature steps and the entry point of 
`KubernetesClientApplication` if we run in cluster mode. If we run in client 
mode, we enter directly into the user's main class, or, the user is in a 
process that just created a Spark context from scratch with the right master 
URL (i.e. `new SparkContext(k8s://my-master:8443)`). If you wanted to create 
the headless service in client mode, you'd have to do it when instantiating the 
`KubernetesClusterSchedulerBackend` somewhere, probably before creating the 
`ExecutorPodsAllocator` so that you'd set `spark.driver.host` and 
`spark.driver.port` properly when telling the created executors where to find 
the driver via the `--driver-url` argument.

I've deferred implementing this code path. The documentation for using a 
headless service is only suggested, but not mentioned as a hard requirement: 
https://github.com/apache/spark/pull/21748/files#diff-b5527f236b253e0d9f5db5164bdb43e9R131.
 I didn't put this as a hard requirement because I could imagine some users 
wanting to not specifically use a headless service for this; perhaps they want 
to use a full Service object and share that service object with ports to be 
exposed for other reachable endpoints their pod exposes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
@liyinan926 I wonder if we want to have the pod name owner reference still 
be a thing, if you will, in client mode. For example what if the pod name that 
is given is accidentally one that is assigned to a different running pod in 
that namespace? Seems simpler to only require the pod name to be set in cluster 
mode, and in client mode, the user is responsible for their own cleanup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
> there was a change in Spark recently in how the driver self-discovered 
its hostname by default, if I am not mistaken. Can't recall the exact patch. I 
remember that change specifically prompting us to set up the headless service 
for cluster mode.

Not exactly recently, but at some point after we started work in mainline.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
@echarles then you'd probably want more information such as the logs of the 
executors, though I'd imagine that one would have trouble getting those given 
that the executor exits so quickly. But maybe a `kubectl get pods -n 
 

[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
Also make sure your driver can actually allocate the port specified by 
`spark.driver.port`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
@echarles there was a change in Spark recently in how the driver 
self-discovered its hostname by default, if I am not mistaken. Can't recall the 
exact patch. I remember that change specifically prompting us to set up the 
headless service for cluster mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
My guess is that the executor is failing to contact the driver - I don't 
think you can use the pod name as the hostname in minikube. Can you create a 
headless service to bind your pod to a stable hostname and try again?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-16 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
Comments are addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.

2018-07-16 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21748#discussion_r202801155
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -120,8 +120,8 @@ This URI is the location of the example jar that is 
already in the Docker image.
 ## Client Mode
 
 Starting with Spark 2.4.0, it is possible to run Spark applications on 
Kubernetes in client mode. When running a Spark
-application in client mode, a separate pod is not deployed to run the 
driver. When running an application in
-client mode, it is recommended to account for the following factors:
+application in client mode, a separate pod is not deployed to run the 
driver. Your Spark driver does not need to run in
+a Kubernetes pod. When running an application in client mode, it is 
recommended to account for the following factors:
--- End diff --

Think that's a bit wordy - perhaps, "When your application runs in client 
mode, the driver process is run locally. The driver can run inside a pod or on 
a physical host."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-16 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
Something's misbehaving in the integration tests, as the integration test 
build isn't able to track dependencies from other modules in the Spark project. 
Instead, the integration test tries to download its Spark dependencies from 
Maven central, which is incorrect. `mvn install` is only a temporary hacky 
solution.

However we should be tracking the integration test issue separately. Can we 
get a final sign off here and merge? @liyinan926 @felixcheung 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-16 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
Filed https://issues.apache.org/jira/browse/SPARK-24825 to track the test 
problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.

2018-07-16 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21748#discussion_r202772018
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -399,18 +426,18 @@ specific to Spark on Kubernetes.
   
 Path to the CA cert file for connecting to the Kubernetes API server 
over TLS from the driver pod when requesting
 executors. This file must be located on the submitting machine's disk, 
and will be uploaded to the driver pod.
-Specify this as a path as opposed to a URI (i.e. do not provide a 
scheme).
+Specify this as a path as opposed to a URI (i.e. do not provide a 
scheme). In client mode, use
+spark.kubernetes.authenticate.caCertFile instead.
   
 
 
   spark.kubernetes.authenticate.driver.clientKeyFile
   (none)
   
 Path to the client key file for authenticating against the Kubernetes 
API server from the driver pod when requesting
-executors. This file must be located on the submitting machine's disk, 
and will be uploaded to the driver pod.
-Specify this as a path as opposed to a URI (i.e. do not provide a 
scheme). If this is specified, it is highly
-recommended to set up TLS for the driver submission server, as this 
value is sensitive information that would be
-passed to the driver pod in plaintext otherwise.
--- End diff --

This comment is very much out of date actually, it's referring to an old 
version of this code where we didn't use K8s secrets to push this data, but we 
instead used a custom HTTP server mounted on the driver. It's from before this 
was merged into apache/master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-23 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
Merging in a few hours if no additional comments are raised.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.

2018-07-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21748#discussion_r205177861
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -117,6 +117,45 @@ If the local proxy is running at localhost:8001, 
`--master k8s://http://127.0.0.
 spark-submit. Finally, notice that in the above example we specify a jar 
with a specific URI with a scheme of `local://`.
 This URI is the location of the example jar that is already in the Docker 
image.
 
+## Client Mode
+
+Starting with Spark 2.4.0, it is possible to run Spark applications on 
Kubernetes in client mode. When your application
+runs in client mode, the driver can run inside a pod or on a physical 
host. When running an application in client mode,
+it is recommended to account for the following factors:
+
+### Client Mode Networking
+
+Spark executors must be able to connect to the Spark driver over a 
hostname and a port that is routable from the Spark
+executors. The specific network configuration that will be required for 
Spark to work in client mode will vary per
+setup. If you run your driver inside a Kubernetes pod, you can use a
+[headless 
service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services)
 to allow your
+driver pod to be routable from the executors by a stable hostname. When 
deploying your headless service, ensure that
+the service's label selector will only match the driver pod and no other 
pods; it is recommended to assign your driver
+pod a sufficiently unique label and to use that label in the label 
selector of the headless service. Specify the driver's
+hostname via `spark.driver.host` and your spark driver's port to 
`spark.driver.port`.
--- End diff --

Yeah manual setup is fine for now. Think additional docs around how to do 
all this can be a separate PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.

2018-07-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21748#discussion_r205178092
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -117,6 +117,45 @@ If the local proxy is running at localhost:8001, 
`--master k8s://http://127.0.0.
 spark-submit. Finally, notice that in the above example we specify a jar 
with a specific URI with a scheme of `local://`.
 This URI is the location of the example jar that is already in the Docker 
image.
 
+## Client Mode
+
+Starting with Spark 2.4.0, it is possible to run Spark applications on 
Kubernetes in client mode. When your application
+runs in client mode, the driver can run inside a pod or on a physical 
host. When running an application in client mode,
+it is recommended to account for the following factors:
+
+### Client Mode Networking
+
+Spark executors must be able to connect to the Spark driver over a 
hostname and a port that is routable from the Spark
+executors. The specific network configuration that will be required for 
Spark to work in client mode will vary per
+setup. If you run your driver inside a Kubernetes pod, you can use a
+[headless 
service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services)
 to allow your
+driver pod to be routable from the executors by a stable hostname. When 
deploying your headless service, ensure that
+the service's label selector will only match the driver pod and no other 
pods; it is recommended to assign your driver
+pod a sufficiently unique label and to use that label in the label 
selector of the headless service. Specify the driver's
+hostname via `spark.driver.host` and your spark driver's port to 
`spark.driver.port`.
+
+### Client Mode Executor Pod Garbage Collection
+
+If you run your Spark driver in a pod, it is highly recommended to set 
`spark.driver.pod.name` to the name of that pod.
+When this property is set, the Spark scheduler will deploy the executor 
pods with an

+[OwnerReference](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/),
 which in turn will
+ensure that once the driver pod is deleted from the cluster, all of the 
application's executor pods will also be deleted.
+The driver will look for a pod with the given name in the namespace 
specified by `spark.kubernetes.namespace`, and
+an OwnerReference pointing to that pod will be added to each executor 
pod's OwnerReferences list. Be careful to avoid
+setting the OwnerReference to a pod that is not actually that driver pod, 
or else the executors may be terminated
+prematurely when the wrong pod is deleted.
+
+If your application is not running inside a pod, or if 
`spark.driver.pod.name` is not set when your application is
+actually running in a pod, keep in mind that the executor pods may not be 
properly deleted from the cluster when the
+application exits. The Spark scheduler attempts to delete these pods, but 
if the network request to the API server fails
+for any reason, these pods will remain in the cluster. The executor 
processes should exit when they cannot reach the
+driver, so the executor pods should not consume compute resources (cpu and 
memory) in the cluster after your application
--- End diff --

Unclear, it triggers in the `onDisconnected` event so I think there's a 
persistent socket connection that's dropped that causes the exit. So, it should 
more or less be instantaneous.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.

2018-07-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21748#discussion_r205178769
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 ---
@@ -35,26 +35,39 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
   override def canCreate(masterURL: String): Boolean = 
masterURL.startsWith("k8s")
 
   override def createTaskScheduler(sc: SparkContext, masterURL: String): 
TaskScheduler = {
-if (masterURL.startsWith("k8s") &&
-  sc.deployMode == "client" &&
-  !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) {
-  throw new SparkException("Client mode is currently not supported for 
Kubernetes.")
-}
-
 new TaskSchedulerImpl(sc)
   }
 
   override def createSchedulerBackend(
   sc: SparkContext,
   masterURL: String,
   scheduler: TaskScheduler): SchedulerBackend = {
+val wasSparkSubmittedInClusterMode = 
sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK)
+val (authConfPrefix,
+  apiServerUri,
+  defaultServiceAccountToken,
+  defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) {
+  require(sc.conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined,
+"If the application is deployed using spark-submit in cluster 
mode, the driver pod name " +
+  "must be provided.")
+  (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
+KUBERNETES_MASTER_INTERNAL_URL,
+Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
+Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
+} else {
+  (KUBERNETES_AUTH_CLIENT_MODE_PREFIX,
+masterURL.substring("k8s://".length()),
--- End diff --

We can make such a helper function, currently this logic is done here and 
in KubernetesClientApplication


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-25 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
Ok after the next build passes I'm going to merge immediately. Thanks for 
the review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21874: [SPARK-23146][K8S][TESTS] Enable client mode inte...

2018-07-25 Thread mccheah
GitHub user mccheah opened a pull request:

https://github.com/apache/spark/pull/21874

[SPARK-23146][K8S][TESTS] Enable client mode integration test.

## What changes were proposed in this pull request?

Enable client mode integration test after merging from master.

## How was this patch tested?

Check the integration test runs in the build.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/palantir/spark enable-client-mode-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21874.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21874


commit 3fd888901a01e2d9cf3903c7888afb39d93214b2
Author: mcheah 
Date:   2018-07-25T18:24:12Z

[SPARK-23146][K8S][TESTS] Enable client mode integration test.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21874: [SPARK-23146][K8S][TESTS] Enable client mode integration...

2018-07-25 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21874
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
@echarles I don't think we should be special-casing Kubernetes here as 
being any different from the other cluster managers. The main point of client 
mode is that the driver is running locally and we make no assumptions about 
network connectivity. Deploying notebooks on other cluster managers which have 
network isolation between the driver and the executors would require the user 
to manually configure their own connectivity too.

There's another problem in creating the headless service here: when you 
create a service, you have to back that service with pods that have certain 
labels; that is, a label selector. However, labels are not unique for a given 
pod; multiple pods can have the same label key-value pairs. Now, when Spark 
Submit has explicit control over the pod that is created as well as the 
service, we make it such that the driver pod has a unique label corresponding 
to an application id, and then the service can be ensured to be backed by only 
that driver pod. If we don't control the creation of the driver pod, we can't 
guarantee that whatever service we create would route to only the driver pod 
running the application in question.

It is therefore unclear what the label selector would be for the headless 
service we create automatically. You could allow the user to have some control 
over the configuration of the headless service. For example, we could have a 
setting indicating the label selector the headless service should be created 
with. Or, the documentation can specifically require the driver pod to have a 
set of labels which are strictly unique to that pod in that namespace. In both 
cases, I would argue that we end up with the same complexity and mental burden, 
if not more so, than having the user deploy a headless service that gives their 
driver pod (and only that driver pod) a stable hostname.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
Though I suppose you could have the driver patch its own metadata fields to 
assign itself a unique label. I could see that being confusing to users when 
they observe that their driver pod metadata is modified from what they 
originally deployed. Nevertheless, it's a tenable option.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
> About selecting the pod with labels, another approach I have taken is 
simply using the name of the driver pod, a bit like I have done with the 
following deployment (so no need to ensure labels - the ports are the ports 
assigned by spark that the code can retrieve).

I don't think you can back a service with a selector that's a pod's name, 
but someone with more knowledge of the Service API might be able to correct me 
here. I was under the impression one had to use labels. In your example, the 
service would match any pod with the label key of `run` being equal to 
`spark-pod`, which isn't guaranteed to map to a single unique pod. In 
spark-submit we set `spark-app-id` to a unique identifier.

> If I compare with yarn-client with all nodes on the same LAN

But if you run a YARN application with the driver not being on the same 
network, then the user has to set up their own connectivity. In Kubernetes that 
kind of networking setup happens to come up more often, perhaps, but it's not 
enough reason to introduce the complexity of it.

Another situation where we want the driver to not be making the headless 
service is in a world where the driver shouldn't have permission to create 
service objects, but can have permission to create pod objects. Adding a flag 
allowing the driver to create the headless service would implicitly change the 
required permissions of the application. This is more work to document and more 
for the application writer to consider.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.

2018-07-18 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21748
  
Sounds fine. How does the documentation look now in that regard?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21862: [SPARK-24903][K8s] Make driver container name configurab...

2018-07-24 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21862
  
We're holding on all additional pod modification feature requests in favor 
of work on https://issues.apache.org/jira/browse/SPARK-24434, which would allow 
the pods to be customized arbitrarily.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22256: [SPARK-25262][K8S] Better support configurability...

2018-08-29 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22256#discussion_r213771635
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
 ---
@@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep(
 .orElse(conf.getOption("spark.local.dir"))
 .getOrElse(defaultLocalDir)
 .split(",")
+  private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
 
   override def configurePod(pod: SparkPod): SparkPod = {
 val localDirVolumes = resolvedLocalDirs
   .zipWithIndex
   .map { case (localDir, index) =>
-new VolumeBuilder()
-  .withName(s"spark-local-dir-${index + 1}")
-  .withNewEmptyDir()
-  .endEmptyDir()
-  .build()
+val name = s"spark-local-dir-${index + 1}"
+// To allow customisation of local dirs backing volumes we should 
avoid creating
+// emptyDir volumes if the volume is already defined in the pod 
spec
+hasVolume(pod, name) match {
+  case true =>
+// For pre-existing volume definitions just re-use the volume
+pod.pod.getSpec().getVolumes().asScala.find(v => 
v.getName.equals(name)).get
+  case false =>
+// Create new emptyDir volume
+new VolumeBuilder()
+  .withName(name)
+  .withNewEmptyDir()
+.withMedium(useLocalDirTmpFs match {
+  case true => "Memory" // Use tmpfs
+  case false => null// Default - use nodes backing 
storage
+})
+  .endEmptyDir()
+  .build()
+}
   }
+
 val localDirVolumeMounts = localDirVolumes
   .zip(resolvedLocalDirs)
   .map { case (localDirVolume, localDirPath) =>
-new VolumeMountBuilder()
-  .withName(localDirVolume.getName)
-  .withMountPath(localDirPath)
-  .build()
+hasVolumeMount(pod, localDirVolume.getName, localDirPath) match {
+  case true =>
+// For pre-existing volume mounts just re-use the mount
+pod.container.getVolumeMounts().asScala
+  .find(m => m.getName.equals(localDirVolume.getName)
+ && m.getMountPath.equals(localDirPath))
+  .get
+  case false =>
+// Create new volume mount
+new VolumeMountBuilder()
+  .withName (localDirVolume.getName)
+  .withMountPath (localDirPath)
+  .build()
+}
+  }
+
+// Check for conflicting volume mounts
+for (m: VolumeMount <- localDirVolumeMounts) {
+  if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 
0) {
+throw new SparkException(s"Conflicting volume mounts defined, pod 
template attempted to " +
+  "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at 
an alternative path " +
+  "then the expected ${m.getPath}")
   }
+}
+
 val podWithLocalDirVolumes = new PodBuilder(pod.pod)
   .editSpec()
-.addToVolumes(localDirVolumes: _*)
+ // Don't want to re-add volumes that already existed in the 
incoming spec
+ // as duplicate definitions will lead to K8S API errors
+.addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, 
v.getName)): _*)
--- End diff --

All of this conflicting volume mount and conflicting volumes seems out of 
place here. If we're anticipating using the pod template file, keep in mind 
that the pod template feature is specifically not designed to do any 
validation. What kinds of errors are we hoping to avoid by doing the 
deduplication here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22256: [SPARK-25262][K8S] Better support configurability...

2018-08-30 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22256#discussion_r214107006
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
 ---
@@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep(
 .orElse(conf.getOption("spark.local.dir"))
 .getOrElse(defaultLocalDir)
 .split(",")
+  private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
 
   override def configurePod(pod: SparkPod): SparkPod = {
 val localDirVolumes = resolvedLocalDirs
   .zipWithIndex
   .map { case (localDir, index) =>
-new VolumeBuilder()
-  .withName(s"spark-local-dir-${index + 1}")
-  .withNewEmptyDir()
-  .endEmptyDir()
-  .build()
+val name = s"spark-local-dir-${index + 1}"
+// To allow customisation of local dirs backing volumes we should 
avoid creating
+// emptyDir volumes if the volume is already defined in the pod 
spec
+hasVolume(pod, name) match {
+  case true =>
+// For pre-existing volume definitions just re-use the volume
+pod.pod.getSpec().getVolumes().asScala.find(v => 
v.getName.equals(name)).get
+  case false =>
+// Create new emptyDir volume
+new VolumeBuilder()
+  .withName(name)
+  .withNewEmptyDir()
+.withMedium(useLocalDirTmpFs match {
+  case true => "Memory" // Use tmpfs
+  case false => null// Default - use nodes backing 
storage
+})
+  .endEmptyDir()
+  .build()
+}
   }
+
 val localDirVolumeMounts = localDirVolumes
   .zip(resolvedLocalDirs)
   .map { case (localDirVolume, localDirPath) =>
-new VolumeMountBuilder()
-  .withName(localDirVolume.getName)
-  .withMountPath(localDirPath)
-  .build()
+hasVolumeMount(pod, localDirVolume.getName, localDirPath) match {
+  case true =>
+// For pre-existing volume mounts just re-use the mount
+pod.container.getVolumeMounts().asScala
+  .find(m => m.getName.equals(localDirVolume.getName)
+ && m.getMountPath.equals(localDirPath))
+  .get
+  case false =>
+// Create new volume mount
+new VolumeMountBuilder()
+  .withName (localDirVolume.getName)
+  .withMountPath (localDirPath)
+  .build()
+}
+  }
+
+// Check for conflicting volume mounts
+for (m: VolumeMount <- localDirVolumeMounts) {
+  if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 
0) {
+throw new SparkException(s"Conflicting volume mounts defined, pod 
template attempted to " +
+  "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at 
an alternative path " +
+  "then the expected ${m.getPath}")
   }
+}
+
 val podWithLocalDirVolumes = new PodBuilder(pod.pod)
   .editSpec()
-.addToVolumes(localDirVolumes: _*)
+ // Don't want to re-add volumes that already existed in the 
incoming spec
+ // as duplicate definitions will lead to K8S API errors
+.addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, 
v.getName)): _*)
--- End diff --

> This is regardless of whether the template feature is opinionated about 
validation, even if the template feature doesn't do validation, Spark code 
itself should be ensuring that it generates valid specs as far as it is able to.

This is a stance that as far I'm aware, we specifically chose not to take 
in the pod template feature. If one is using the pod template feature then 
Spark won't provide any guarantees that the pod it makes will be well-formed. 
When spark submit deploys the pod to the cluster the API will return a clear 
enough error informing the user to make the appropriate corrections to their 
pod template.

@onursatici I just checked the pod template files PR, I didn't see this 
specifically called out - should this be documented?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-31 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/22112
  
@cloud-fan @tgravescs was wondering if we could get an ETA on this landing?

Also, I tried running something analogous to the example script from the 
description of https://issues.apache.org/jira/browse/SPARK-23207, but for RDDs. 
However, it did not manifest the correctness problem even before this patch was 
applied. Are there any ways to reliably reproduce this with a minimal script?

The below script is run in my Spark shell, Spark standalone mode 
single-node cluster with 2 workers, client mode, with the external shuffle 
service enabled. It does not reproduce the issue.

```
import scala.sys.process._
  
import org.apache.spark.TaskContext
val res = sc.parallelize(0 until 1000 * 1000, 1).coalesce(200, shuffle = 
true).map { x =>
  x
}.coalesce(200, shuffle = true).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 
2) {
throw new Exception("pkill -f -n java".!!) // Kills the newest Java 
process, ideally the executors
  }
  x
}
res.distinct().count()
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22298: [SPARK-25021][K8S] Add spark.executor.pyspark.mem...

2018-08-31 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22298#discussion_r214491727
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 ---
@@ -51,6 +51,7 @@ private[spark] class BasicDriverFeatureStep(
 .get(DRIVER_MEMORY_OVERHEAD)
 .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * 
driverMemoryMiB).toInt,
   MEMORY_OVERHEAD_MIN_MIB))
+  // TODO: Have memory limit checks on driverMemory
--- End diff --

Hm can you elaborate here? We already set the driver memory limit in this 
step based on the overhead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22215: [SPARK-25222][K8S] Improve container status loggi...

2018-08-31 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22215#discussion_r214490999
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 ---
@@ -60,4 +64,81 @@ private[spark] object KubernetesUtils {
   }
 
   def parseMasterUrl(url: String): String = url.substring("k8s://".length)
+
+  def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : 
String = {
+// Use more loggable format if value is null or empty
+val indentStr = "\t" * indent
--- End diff --

Can we prefer space-based indentation? Curious as to whether others have an 
opinion about this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22215: [SPARK-25222][K8S] Improve container status logging

2018-08-31 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/22215
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22298: [SPARK-25021][K8S] Add spark.executor.pyspark.mem...

2018-08-31 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22298#discussion_r214492080
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala
 ---
@@ -38,7 +39,8 @@ private[spark] class JavaDriverFeatureStep(
   .build()
 SparkPod(pod.pod, withDriverArgs)
   }
-  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
+  override def getAdditionalPodSystemProperties(): Map[String, String] =
--- End diff --

Again, not sure what our direction is going to be with respect to mixed 
pipelines throughout the cluster manager sections - if we should be supporting 
it in a first class way then perhaps we file a JIRA and we can discuss how the 
submission client should be refactored to support that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22215: [SPARK-25222][K8S] Improve container status loggi...

2018-08-31 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22215#discussion_r214491177
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 ---
@@ -60,4 +64,81 @@ private[spark] object KubernetesUtils {
   }
 
   def parseMasterUrl(url: String): String = url.substring("k8s://".length)
+
+  def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : 
String = {
+// Use more loggable format if value is null or empty
+val indentStr = "\t" * indent
+pairs.map {
+  case (k, v) => s"\n$indentStr $k: 
${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
+}.mkString("")
+  }
+
+  /**
+   * Given a pod, output a human readable representation of its state
+   *
+   * @param pod Pod
+   * @return Human readable pod state
+   */
+  def formatPodState(pod: Pod): String = {
+val details = Seq[(String, String)](
+  // pod metadata
+  ("pod name", pod.getMetadata.getName),
+  ("namespace", pod.getMetadata.getNamespace),
+  ("labels", pod.getMetadata.getLabels.asScala.mkString(", ")),
+  ("pod uid", pod.getMetadata.getUid),
+  ("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),
+
+  // spec details
+  ("service account name", pod.getSpec.getServiceAccountName),
+  ("volumes", 
pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")),
+  ("node name", pod.getSpec.getNodeName),
+
+  // status
+  ("start time", formatTime(pod.getStatus.getStartTime)),
+  ("phase", pod.getStatus.getPhase),
+  ("container status", containersDescription(pod, 2))
+)
+
+formatPairsBundle(details)
+  }
+
+  def containersDescription(p: Pod, indent: Int = 1): String = {
+p.getStatus.getContainerStatuses.asScala.map { status =>
+  Seq(
+("container name", status.getName),
+("container image", status.getImage)) ++
+containerStatusDescription(status)
+}.map(p => formatPairsBundle(p, indent)).mkString("\n\n")
+  }
+
+  def containerStatusDescription(containerStatus: ContainerStatus)
+: Seq[(String, String)] = {
+val state = containerStatus.getState
+Option(state.getRunning)
--- End diff --

This is a really cool use of partial functions - wonder if there's other 
places where we should be matching this way (of course doesn't have to be done 
here).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22215: [SPARK-25222][K8S] Improve container status loggi...

2018-08-31 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22215#discussion_r214491272
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 ---
@@ -151,13 +152,15 @@ private[spark] class ExecutorPodsLifecycleManager(
 
   private def exitReasonMessage(podState: FinalPodState, execId: Long, 
exitCode: Int) = {
 val pod = podState.pod
+val reason = Option(pod.getStatus.getReason)
+val message = Option(pod.getStatus.getMessage)
 s"""
|The executor with id $execId exited with exit code $exitCode.
-   |The API gave the following brief reason: ${pod.getStatus.getReason}
-   |The API gave the following message: ${pod.getStatus.getMessage}
+   |The API gave the following brief reason: ${reason.getOrElse("")}
--- End diff --

Maybe default as `N/A`? Might be confusing to be left blank.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22298: [SPARK-25021][K8S] Add spark.executor.pyspark.mem...

2018-08-31 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22298#discussion_r214492297
  
--- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala
 ---
@@ -72,12 +72,33 @@ private[spark] trait PythonTestsSuite { k8sSuite: 
KubernetesSuite =>
   isJVM = false,
   pyFiles = Some(PYSPARK_CONTAINER_TESTS))
   }
+
+  test("Run PySpark with memory customization", k8sTestTag) {
+sparkAppConf
+  .set("spark.kubernetes.container.image", 
s"${getTestImageRepo}/spark-py:${getTestImageTag}")
--- End diff --

Some of this stuff can be factored out I think, we just haven't done so 
yet. I wouldn't block a merge of this on such a refactor, but this entire test 
class could probably use some cleanup with respect to how the code is 
structured.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22257: [SPARK-25264][K8S] Fix comma-delineated arguments passed...

2018-08-31 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/22257
  
Looks good, thanks. Merging.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22298: [SPARK-25021][K8S] Add spark.executor.pyspark.memory lim...

2018-08-31 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/22298
  
https://issues.apache.org/jira/browse/SPARK-25291 looks like a real issue 
with the way the tests are written. So we don't necessarily want to ignore it 
for this patch, but we're still thinking about it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...

2018-09-05 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21669#discussion_r215446604
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 ---
@@ -51,7 +51,11 @@ private[spark] class KubernetesDriverBuilder(
 provideJavaStep: (
   KubernetesConf[KubernetesDriverSpecificConf]
 => JavaDriverFeatureStep) =
-new JavaDriverFeatureStep(_)) {
+new JavaDriverFeatureStep(_),
--- End diff --

Argument type is a function with the default value being assigned to the 
default constructor. It might be cleaner to write this as follows:

- Constructor in the class declaration has _no_ default arguments
- Provide an empty arg constructor (`def this()`) which passes along all 
default implementations to the full-args constructor.

In this case default arguments are hurting more than they are helping, and 
in the unit tests, we always supply mocks for _every_ arg, meaning we always 
use all defaults (real implementation) or no defaults (test implementation).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22146: [SPARK-24434][K8S] pod template files

2018-09-05 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/22146
  
I think we still need some consensus on the driver container API. 
Personally I think it's clearer to use a configuration option to specifically 
select the container that is the driver, but this is an opinion I hold loosely.

Would like specific weigh-in from @erikerlandson and @liyinan926.

Aside from that, this looks good on my end apart from the tests we need to 
add and should be ready to merge pretty soon (depending also on Spark 2.4 
timeline)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22323: [SPARK-25262][K8S] Allow SPARK_LOCAL_DIRS to be t...

2018-09-05 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22323#discussion_r215456338
  
--- Diff: docs/running-on-kubernetes.md ---
@@ -215,6 +215,19 @@ 
spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.clai
 
 The configuration properties for mounting volumes into the executor pods 
use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. 
For a complete list of available options for each supported type of volumes, 
please refer to the [Spark Properties](#spark-properties) section below. 
 
+## Local Storage
+
+Spark uses temporary scratch space to spill data to disk during shuffles 
and other operations.  When using Kubernetes as the resource manager the pods 
will be created with an 
[emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) 
volume mounted for each directory listed in `SPARK_LOCAL_DIRS`.  If no 
directories are explicitly specified then a default directory is created and 
configured appropriately.
+
+`emptyDir` volumes use the ephemeral storage feature of Kubernetes and do 
not persist beyond the life of the pod.
+
+### Using RAM for local storage
+
+As `emptyDir` volumes use the nodes backing storage for ephemeral storage 
this default behaviour may not be appropriate for some compute environments.  
For example if you have diskless nodes with remote storage mounted over a 
network having lots of executors doing IO to this remote storage may actually 
degrade performance.
+
+In this case it may be desirable to set 
`spark.kubernetes.local.dirs.tmpfs=true` in your configuration which will cause 
the `emptyDir` volumes to be configured as `tmpfs` i.e. RAM backed volumes.  
When configured like this Sparks local storage usage will count towards your 
pods memory usage therefore you may wish to increase your memory requests via 
the normal `spark.driver.memory` and `spark.executor.memory` configuration 
properties.
--- End diff --

You can't allocate space for tmpfs via `spark,{driver, executor}.memory` 
because that will strictly be allocated to the heap. The Java command is 
basically this:

`/bin/java -Xmx${spark.driver.memory}`

hence strictly requiring memory overhead to get this space to be dedicated 
to tmpfs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files

2018-09-05 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/22146#discussion_r215452489
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 ---
@@ -81,9 +95,12 @@ private[spark] class KubernetesDriverBuilder(
   .getOrElse(provideJavaStep(kubernetesConf))
 
 val allFeatures = (baseFeatures :+ bindingsStep) ++
-  secretFeature ++ envSecretFeature ++ volumesFeature
+  secretFeature ++ envSecretFeature ++ volumesFeature ++ 
podTemplateFeature
 
-var spec = 
KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
+var spec = KubernetesDriverSpec(
+  provideInitialPod(),
+  Seq.empty,
--- End diff --

Ping on this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    2   3   4   5   6   7   8   9   10   >