[GitHub] spark pull request #20167: [SPARK-16501] [MESOS] Allow providing Mesos princ...

2018-02-05 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/20167#discussion_r165994809
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -71,40 +74,64 @@ trait MesosSchedulerUtils extends Logging {
   failoverTimeout: Option[Double] = None,
   frameworkId: Option[String] = None): SchedulerDriver = {
 val fwInfoBuilder = 
FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
-val credBuilder = Credential.newBuilder()
+
fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
+  conf.get(DRIVER_HOST_ADDRESS)))
 webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) }
 checkpoint.foreach { checkpoint => 
fwInfoBuilder.setCheckpoint(checkpoint) }
 failoverTimeout.foreach { timeout => 
fwInfoBuilder.setFailoverTimeout(timeout) }
 frameworkId.foreach { id =>
   fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
 }
-
fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
-  conf.get(DRIVER_HOST_ADDRESS)))
-conf.getOption("spark.mesos.principal").foreach { principal =>
-  fwInfoBuilder.setPrincipal(principal)
-  credBuilder.setPrincipal(principal)
-}
-conf.getOption("spark.mesos.secret").foreach { secret =>
-  credBuilder.setSecret(secret)
-}
-if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
-  throw new SparkException(
-"spark.mesos.principal must be configured when spark.mesos.secret 
is set")
-}
+
 conf.getOption("spark.mesos.role").foreach { role =>
   fwInfoBuilder.setRole(role)
 }
 val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
 if (maxGpus > 0) {
   
fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
 }
+val credBuilder = buildCredentials(conf, fwInfoBuilder)
 if (credBuilder.hasPrincipal) {
   new MesosSchedulerDriver(
 scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
 } else {
   new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
 }
   }
+  
+  def buildCredentials(
+  conf: SparkConf, 
+  fwInfoBuilder: Protos.FrameworkInfo.Builder): 
Protos.Credential.Builder = {
+val credBuilder = Credential.newBuilder()
+conf.getOption("spark.mesos.principal")
+  .orElse(Option(conf.getenv("SPARK_MESOS_PRINCIPAL")))
--- End diff --

I would want to make sure that @susanxhuynh and/or @skonto agree, but I 
think this is probably fine. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20167: [SPARK-16501] [MESOS] Allow providing Mesos princ...

2018-01-30 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/20167#discussion_r164825718
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -71,40 +74,64 @@ trait MesosSchedulerUtils extends Logging {
   failoverTimeout: Option[Double] = None,
   frameworkId: Option[String] = None): SchedulerDriver = {
 val fwInfoBuilder = 
FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
-val credBuilder = Credential.newBuilder()
+
fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
+  conf.get(DRIVER_HOST_ADDRESS)))
 webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) }
 checkpoint.foreach { checkpoint => 
fwInfoBuilder.setCheckpoint(checkpoint) }
 failoverTimeout.foreach { timeout => 
fwInfoBuilder.setFailoverTimeout(timeout) }
 frameworkId.foreach { id =>
   fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
 }
-
fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
-  conf.get(DRIVER_HOST_ADDRESS)))
-conf.getOption("spark.mesos.principal").foreach { principal =>
-  fwInfoBuilder.setPrincipal(principal)
-  credBuilder.setPrincipal(principal)
-}
-conf.getOption("spark.mesos.secret").foreach { secret =>
-  credBuilder.setSecret(secret)
-}
-if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
-  throw new SparkException(
-"spark.mesos.principal must be configured when spark.mesos.secret 
is set")
-}
+
 conf.getOption("spark.mesos.role").foreach { role =>
   fwInfoBuilder.setRole(role)
 }
 val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
 if (maxGpus > 0) {
   
fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
 }
+val credBuilder = buildCredentials(conf, fwInfoBuilder)
 if (credBuilder.hasPrincipal) {
   new MesosSchedulerDriver(
 scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
 } else {
   new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
 }
   }
+  
+  def buildCredentials(
+  conf: SparkConf, 
+  fwInfoBuilder: Protos.FrameworkInfo.Builder): 
Protos.Credential.Builder = {
+val credBuilder = Credential.newBuilder()
+conf.getOption("spark.mesos.principal")
+  .orElse(Option(conf.getenv("SPARK_MESOS_PRINCIPAL")))
--- End diff --

Sorry for the delay. I have a use case where I start the Dispatcher in the 
Mesos cluster and then execute `spark-submit` cluster calls from within the 
container. Unfortunately this requires me to unset a few environment variables 
(`MESOS_EXECUTOR_ID MESOS_FRAMEWORK_ID MESOS_SLAVE_ID MESOS_SLAVE_PID 
MESOS_TASK_ID`) because they interfere with `spark-submit` due to this function 
in the rest client. 

If the Dispatcher is started in a mode where it needs these Mesos 
authentication credentials, can we assume that we'll want to always forward 
them this same way? I realize I might be getting into the weeds here and this 
might me a _me_ problem. But I thought I'd bring it up. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20167: [SPARK-16501] [MESOS] Allow providing Mesos princ...

2018-01-22 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/20167#discussion_r163001901
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -71,40 +74,64 @@ trait MesosSchedulerUtils extends Logging {
   failoverTimeout: Option[Double] = None,
   frameworkId: Option[String] = None): SchedulerDriver = {
 val fwInfoBuilder = 
FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
-val credBuilder = Credential.newBuilder()
+
fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
+  conf.get(DRIVER_HOST_ADDRESS)))
 webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) }
 checkpoint.foreach { checkpoint => 
fwInfoBuilder.setCheckpoint(checkpoint) }
 failoverTimeout.foreach { timeout => 
fwInfoBuilder.setFailoverTimeout(timeout) }
 frameworkId.foreach { id =>
   fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
 }
-
fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
-  conf.get(DRIVER_HOST_ADDRESS)))
-conf.getOption("spark.mesos.principal").foreach { principal =>
-  fwInfoBuilder.setPrincipal(principal)
-  credBuilder.setPrincipal(principal)
-}
-conf.getOption("spark.mesos.secret").foreach { secret =>
-  credBuilder.setSecret(secret)
-}
-if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
-  throw new SparkException(
-"spark.mesos.principal must be configured when spark.mesos.secret 
is set")
-}
+
 conf.getOption("spark.mesos.role").foreach { role =>
   fwInfoBuilder.setRole(role)
 }
 val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
 if (maxGpus > 0) {
   
fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
 }
+val credBuilder = buildCredentials(conf, fwInfoBuilder)
 if (credBuilder.hasPrincipal) {
   new MesosSchedulerDriver(
 scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
 } else {
   new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
 }
   }
+  
+  def buildCredentials(
+  conf: SparkConf, 
+  fwInfoBuilder: Protos.FrameworkInfo.Builder): 
Protos.Credential.Builder = {
+val credBuilder = Credential.newBuilder()
+conf.getOption("spark.mesos.principal")
+  .orElse(Option(conf.getenv("SPARK_MESOS_PRINCIPAL")))
--- End diff --

If you use this environment variable then won't these always be set when 
using the [Rest 
client](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala#L407).
 Is this the desired behavior? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20167: [SPARK-16501] [MESOS] Allow providing Mesos princ...

2018-01-22 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/20167#discussion_r162997172
  
--- Diff: docs/running-on-mesos.md ---
@@ -427,15 +437,30 @@ See the [configuration page](configuration.html) for 
information on Spark config
   spark.mesos.principal
   (none)
   
-Set the principal with which Spark framework will use to authenticate 
with Mesos.
+Set the principal with which Spark framework will use to authenticate 
with Mesos.  You can also specify this via the environment variable 
`SPARK_MESOS_PRINCIPAL`
+  
+
+
+  spark.mesos.principal.file
+  (none)
+  
+Set the file containing the principal with which Spark framework will 
use to authenticate with Mesos.  Allows specifying the principal indirectly in 
more security conscious deployments.  The file must be readable by the user 
launching the job.  You can also specify this via the environment variable 
`SPARK_MESOS_PRINCIPAL_FILE`
   
 
 
   spark.mesos.secret
   (none)
   
 Set the secret with which Spark framework will use to authenticate 
with Mesos. Used, for example, when
-authenticating with the registry.
+authenticating with the registry.  You can also specify this via the 
environment variable `SPARK_MESOS_SECRET`
+  
+
+
+  spark.mesos.secret.file
+  (none)
+  
--- End diff --

Maybe comment on the format of the secret? I.e. we assume this is a 
plaintext secret, not a binary one? Is that true? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20167: Allow providing Mesos principal & secret via files (SPAR...

2018-01-13 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/20167
  
Hello @rvesse thanks for this patch! With Mesos having [reference 
secrets](https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto#L2596)
 now I think providing this as a file is a natural evolution.

@vanzin, @rvesse , Why not also allow for `spark.authenticate.secret` to be 
a file? We have an 
[patch](https://github.com/mesosphere/spark/commit/a897c1ca07e67935b8ae80cfc1d41370fe8ef60b)
 at Mesosphere that allows this. It can be in a different JIRA/PR just want an 
opinion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20167: Allow providing Mesos principal & secret via file...

2018-01-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/20167#discussion_r161385124
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -80,10 +80,27 @@ trait MesosSchedulerUtils extends Logging {
 }
 
fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
   conf.get(DRIVER_HOST_ADDRESS)))
+conf.getOption("spark.mesos.principal.file")
+  .orElse(Option(conf.getenv("SPARK_MESOS_PRINCIPAL_FILE"))
+  .foreach { principalFile =>
+val file = io.Source.fromFile(principalFile)
+val principal = file.getLines.next()
+file.close
+fwInfoBuilder.setPrincipal(principal)
+credBuilder.setPrincipal(principal)
+  }
 conf.getOption("spark.mesos.principal").foreach { principal =>
   fwInfoBuilder.setPrincipal(principal)
   credBuilder.setPrincipal(principal)
 }
+conf.getOption("spark.mesos.secret.file")
+  .orElse(Option(conf.getenv("SPARK_MESOS_SECRET_FILE"))
--- End diff --

Environment-variable secrets (even using the `secrets` primitive up to 
Mesos 1.4) will be available at `/proc//environ` so file-based is better 
more secure. I'm tempted to say having an insecure _option_ is worse than 
having less flexibility. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20167: Allow providing Mesos principal & secret via file...

2018-01-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/20167#discussion_r161385080
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -80,10 +80,27 @@ trait MesosSchedulerUtils extends Logging {
 }
 
fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
   conf.get(DRIVER_HOST_ADDRESS)))
+conf.getOption("spark.mesos.principal.file")
--- End diff --

I agree. I don't think it's necessary to put the principal in a file, just 
the secret. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-28 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r153510604
  
--- Diff: core/src/main/scala/org/apache/spark/SecurityManager.scala ---
@@ -542,7 +496,54 @@ private[spark] class SecurityManager(
* Gets the secret key.
* @return the secret key as a String if authentication is enabled, 
otherwise returns null
*/
-  def getSecretKey(): String = secretKey
+  def getSecretKey(): String = {
+if (isAuthenticationEnabled) {
+  Option(sparkConf.getenv(ENV_AUTH_SECRET))
--- End diff --

FWIW, in Mesos, we are planning on using the [Secrets 
primitives](https://github.com/apache/spark/blob/b3f9dbf48ec0938ff5c98833bb6b6855c620ef57/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala#L26)
 to distribute `ENV_AUTH_SECRET`. This way Mesos and YARN can both use the same 
secret-generation code and only differ in the distribution of the secret. 
`SPARK_AUTH_SECRET_CONF` is already somewhat awkward because it has to be in 
the config.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19793: [SPARK-22574] [Mesos] [Submit] Check submission r...

2017-11-27 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19793#discussion_r153348394
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
 ---
@@ -82,6 +82,12 @@ private[mesos] class MesosSubmitRequestServlet(
 val mainClass = Option(request.mainClass).getOrElse {
   throw new SubmitRestMissingFieldException("Main class is missing.")
 }
+val appArgs = Option(request.appArgs).getOrElse {
+  throw new SubmitRestMissingFieldException("Application arguments are 
missing.")
--- End diff --

Nit: maybe put the fields here also? Something like `Application arguments 
("appArgs") are missing`. Similar below. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19390: [SPARK-18935][MESOS] Fix dynamic reservations on mesos

2017-11-27 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19390
  
@skonto I checked this again. Works great, thanks for the patch. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19793: [SPARK-22574] [Mesos] [Submit] Check submission request ...

2017-11-27 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19793
  
@Gschiavon Looking.. thanks for the ping. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19798: [SPARK-22583] First delegation token renewal time is not...

2017-11-22 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19798
  
@kalvinnchau thanks for catching this mistake. @vanzin can we get a quick 
merge? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19390: [SPARK-18935][MESOS] Fix dynamic reservations on mesos

2017-11-22 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19390
  
Just to reiterate our conversation here. 

The `AllocationInfo` tells the scheduler which `role`’s allocation of 
resources the `Offer` is coming from, whereas the `ReservationInfo` tells the 
scheduler whether or not the `Offer` contains reserved resources. We needed the 
latter to effectively user dynamically reserved resources.

From your (old) PR it looks like the `AllocationInfo` is mostly just 
“forwarded”, i.e. there isn’t any logic around it’s contents. So your 
new PR removes references to `AllocationInfo` to prevent breaking with Mesos 
1.3- and remain non-MULTI_ROLE.

The purpose of `AllocationInfo` is (in the future) to be able to have some 
logic around which tasks get launched on resources allocated to various roles 
that a framework is subscribed as - but we don’t have that yet and isn't 
addressed in this patch. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-11-15 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19272
  
@vanzin Thanks for the reviews and mentorship! 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-15 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r151239494
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError(s"Failed to fetch Hadoop delegation tokens $e")
+throw e
+}
+  }
+
+  private val keytabFile: Option[String] = conf.get(config.KEYTAB)
+
+  scheduleTokenRenewal()
+
+  private def scheduleTokenRenewal(): Unit = {
+if (keytabFile.isDefined) {
+  require(principal != null, "Principal is required for Keytab-based 
authentication")
+  logInfo(s"Using keytab: ${keytabFile.get} and principal $principal")
+} else {
+  logInfo("Using ticket cache for Kerberos authentication, no token 
renewal.")
+  return
+}
+
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+

[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-11-14 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19272
  
@vanzin PTAL. 
I removed the awkward `mode` parameter from the token manager. Now we only 
start the renewer thread when using a keytab/principal. The condition is logged 
appropriately. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-14 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150942920
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
  

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-14 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150933634
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
  

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150717225
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
  

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r150711225
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosHadoopDelegationTokenManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosHadoopDelegationTokenManager fetches and updates Hadoop 
delegation tokens on the behalf
+ * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN 
AMCredentialRenewer,
+ * and similarly will renew the Credentials when 75% of the renewal 
interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * received they overwrite the current credentials.
+ */
+private[spark] class MesosHadoopDelegationTokenManager(
+conf: SparkConf,
+hadoopConfig: Configuration,
+driverEndpoint: RpcEndpointRef)
+  extends Logging {
+
+  require(driverEndpoint != null, "DriverEndpoint is not initialized")
+
+  private val credentialRenewerThread: ScheduledExecutorService =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  private val tokenManager: HadoopDelegationTokenManager =
+new HadoopDelegationTokenManager(conf, hadoopConfig)
+
+  private val principal: String = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile: Option[String], mode: String) = 
getSecretFile(conf)
+
+  private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
+try {
+  val creds = UserGroupInformation.getCurrentUser.getCredentials
+  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+  logInfo(s"Initialized tokens: 
${SparkHadoopUtil.get.dumpTokens(creds)}")
+  (SparkHadoopUtil.get.serialize(creds), rt)
+} catch {
+  case e: Exception =>
+logError("Failed to initialize Hadoop delegation tokens\n" +
+  s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file 
$secretFile\n\tException: $e")
+throw e
+}
+  }
+
+  scheduleTokenRenewal()
+
+  private def getSecretFile(conf: SparkConf): (Option[String], String) = {
+val keytab = conf.get(config.KEYTAB)
+val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
+val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
+  // if a keytab and a specific ticket cache is specified use the 
keytab and log the behavior
+  logWarning(s"Keytab and TGT were detected, using keytab, 
${keytab.get}, " +
+s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab.isDefined) "keytab" else "tgt"
+  val sf = if (keytab.isDefined) keytab else tgt
+  (sf, m)
+}
+
+if (principal == null) {
  

[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-11-13 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19272
  
Hello @maverick2202, hopefully 2.3 (and maybe back ported?) but that's up 
to the Committers. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-11-12 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19272
  
Hello @vanzin thanks for the continued help with this, anything else 
needed? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-08 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149797247
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,14 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far, then start the token renewer
+if (hadoopDelegationTokens.isDefined) {
--- End diff --

Check out the patch now. `hadoopDelegationTokens` now calls 
`initializeHadoopDelegationTokens` (renamed `fetchHadoopDelegationTokens`) by 
name:
```scala
  private val hadoopDelegationTokens: () => Option[Array[Byte]] = 
fetchHadoopDelegationTokens
```
 This has the effect of only generating the first set of delegation tokens 
once the first `RetrieveSparkAppConfig` message is received. At this point, 
everything has been initialized because renewer (renamed 
`MesosHadoopDelegationTokenManager`) is evaluated lazily with the correct 
`driverEndpoint`. 

It's a bit confusing to just avoid an extra conditional. WDYT? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19555: [SPARK-22133][DOCS] Documentation for Mesos Reject Offer...

2017-11-08 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19555
  
Hey @srowen can we get a merge on this? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149564294
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,14 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far, then start the token renewer
+if (hadoopDelegationTokens.isDefined) {
--- End diff --

I may have spoke too soon, there might be a way..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r149549953
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,14 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far, then start the token renewer
+if (hadoopDelegationTokens.isDefined) {
--- End diff --

I agree that I shouldn't need to use the conditional 
`hadoopDelegationTokens.isDefined`, however there will need to be some check 
(`UserGroupInformation.isSecurityEnabled` or similar) to pass the 
`driverEndpoint` to the renewer/manager here. When the initial tokens are 
generated `driverEndpoint` is still `None` because `start()` hasn't been called 
yet. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r149466183
  
--- Diff: core/src/main/scala/org/apache/spark/SecurityManager.scala ---
@@ -542,7 +496,55 @@ private[spark] class SecurityManager(
* Gets the secret key.
* @return the secret key as a String if authentication is enabled, 
otherwise returns null
*/
-  def getSecretKey(): String = secretKey
+  def getSecretKey(): String = {
+if (isAuthenticationEnabled) {
+  Option(sparkConf.getenv(ENV_AUTH_SECRET))
+.orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
+.getOrElse {
+  throw new IllegalArgumentException(
+s"A secret key must be specified via the 
$SPARK_AUTH_SECRET_CONF config")
+}
+} else {
+  null
+}
+  }
+
+  /**
+   * Initialize the configuration object held by this class for 
authentication.
+   *
+   * If authentication is disabled, do nothing.
+   *
+   * In YARN mode, generate a secret key and store it in the configuration 
object, setting it up to
+   * also be propagated to executors using an env variable.
+   *
+   * In other modes, assert that the auth secret is set in the 
configuration.
+   */
+  def initializeAuth(): Unit = {
+if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
+  return
+}
+
+if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") {
+  require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
+s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF 
config.")
+  return
+}
+
+// In YARN, force creation of a new secret if this is client mode. 
This ensures each
--- End diff --

Ok, thanks for the clarification. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r149456772
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala ---
@@ -398,9 +399,20 @@ private[spark] object RestSubmissionClient {
   val PROTOCOL_VERSION = "v1"
 
   /**
-   * Submit an application, assuming Spark parameters are specified 
through the given config.
-   * This is abstracted to its own method for testing purposes.
+   * Filter non-spark environment variables from any environment.
*/
+  private[rest] def filterSystemEnvironment(env: Map[String, String]): 
Map[String, String] = {
+env.filterKeys { k =>
+  // SPARK_HOME is filtered out because it is usually wrong on the 
remote machine (SPARK-12345)
+  (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != 
"SPARK_HOME") ||
+k.startsWith("MESOS_")
--- End diff --

Yes, I apologize you're correct. I think this is actually to filter out 
things like `MESOS_EXECUTOR_ID` and `MESOS_FRAMEWORK_ID`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r149455493
  
--- Diff: core/src/main/scala/org/apache/spark/SecurityManager.scala ---
@@ -542,7 +496,55 @@ private[spark] class SecurityManager(
* Gets the secret key.
* @return the secret key as a String if authentication is enabled, 
otherwise returns null
*/
-  def getSecretKey(): String = secretKey
+  def getSecretKey(): String = {
+if (isAuthenticationEnabled) {
+  Option(sparkConf.getenv(ENV_AUTH_SECRET))
+.orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
+.getOrElse {
+  throw new IllegalArgumentException(
+s"A secret key must be specified via the 
$SPARK_AUTH_SECRET_CONF config")
+}
+} else {
+  null
+}
+  }
+
+  /**
+   * Initialize the configuration object held by this class for 
authentication.
+   *
+   * If authentication is disabled, do nothing.
+   *
+   * In YARN mode, generate a secret key and store it in the configuration 
object, setting it up to
+   * also be propagated to executors using an env variable.
+   *
+   * In other modes, assert that the auth secret is set in the 
configuration.
+   */
+  def initializeAuth(): Unit = {
+if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
+  return
+}
+
+if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") {
+  require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
+s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF 
config.")
+  return
+}
+
+// In YARN, force creation of a new secret if this is client mode. 
This ensures each
--- End diff --

Yes. I guess I'm wondering if now with your change will this work in all 
cases not just YARN? Perhaps obviously, I'm looking into changing this for 
Mesos.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r149375998
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala ---
@@ -398,9 +399,20 @@ private[spark] object RestSubmissionClient {
   val PROTOCOL_VERSION = "v1"
 
   /**
-   * Submit an application, assuming Spark parameters are specified 
through the given config.
-   * This is abstracted to its own method for testing purposes.
+   * Filter non-spark environment variables from any environment.
*/
+  private[rest] def filterSystemEnvironment(env: Map[String, String]): 
Map[String, String] = {
+env.filterKeys { k =>
+  // SPARK_HOME is filtered out because it is usually wrong on the 
remote machine (SPARK-12345)
+  (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != 
"SPARK_HOME") ||
+k.startsWith("MESOS_")
--- End diff --

Will this may break Mesos when using [the mesos 
bundle](https://github.com/apache/spark/blob/master/conf/spark-env.sh.template#L33)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r149375879
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -365,22 +370,21 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
 
 // This security manager will not need an auth secret, but set a dummy 
value in case
 // spark.authenticate is enabled, otherwise an exception is thrown.
--- End diff --

this comment is no longer true?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r149375835
  
--- Diff: core/src/main/scala/org/apache/spark/SecurityManager.scala ---
@@ -542,7 +496,55 @@ private[spark] class SecurityManager(
* Gets the secret key.
* @return the secret key as a String if authentication is enabled, 
otherwise returns null
*/
-  def getSecretKey(): String = secretKey
+  def getSecretKey(): String = {
+if (isAuthenticationEnabled) {
+  Option(sparkConf.getenv(ENV_AUTH_SECRET))
+.orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
+.getOrElse {
+  throw new IllegalArgumentException(
+s"A secret key must be specified via the 
$SPARK_AUTH_SECRET_CONF config")
+}
+} else {
+  null
+}
+  }
+
+  /**
+   * Initialize the configuration object held by this class for 
authentication.
+   *
+   * If authentication is disabled, do nothing.
+   *
+   * In YARN mode, generate a secret key and store it in the configuration 
object, setting it up to
+   * also be propagated to executors using an env variable.
+   *
+   * In other modes, assert that the auth secret is set in the 
configuration.
+   */
+  def initializeAuth(): Unit = {
+if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
+  return
+}
+
+if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") {
+  require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
+s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF 
config.")
+  return
+}
+
+// In YARN, force creation of a new secret if this is client mode. 
This ensures each
--- End diff --

Is there a reason this _has_ to be unique to YARN? Will this solve the 
problem (in Mesos currently) where when the Executors 
[bootstrap](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L193)
 they do so without security (unless you 
"[bake](https://github.com/apache/spark/blob/e1960c3d6f380b0dfbba6ee5d8ac6da4bc29a698/core/src/main/scala/org/apache/spark/SparkConf.scala#L482)"
 the secret and secret config into the container image)? Looks like propagating 
the envvar is only handled in the YARN case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-07 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r149361297
  
--- Diff: core/src/main/scala/org/apache/spark/SecurityManager.scala ---
@@ -542,7 +496,55 @@ private[spark] class SecurityManager(
* Gets the secret key.
* @return the secret key as a String if authentication is enabled, 
otherwise returns null
*/
-  def getSecretKey(): String = secretKey
+  def getSecretKey(): String = {
+if (isAuthenticationEnabled) {
+  Option(sparkConf.getenv(ENV_AUTH_SECRET))
+.orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
+.getOrElse {
+  throw new IllegalArgumentException(
+s"A secret key must be specified via the 
$SPARK_AUTH_SECRET_CONF config")
+}
+} else {
+  null
+}
+  }
+
+  /**
+   * Initialize the configuration object held by this class for 
authentication.
+   *
+   * If authentication is disabled, do nothing.
+   *
+   * In YARN mode, generate a secret key and store it in the configuration 
object, setting it up to
+   * also be propagated to executors using an env variable.
+   *
+   * In other modes, assert that the auth secret is set in the 
configuration.
+   */
+  def initializeAuth(): Unit = {
+if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
+  return
+}
+
+if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") {
+  require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
+s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF 
config.")
+  return
+}
+
+// In YARN, force creation of a new secret if this is client mode. 
This ensures each
--- End diff --

Is there a reason this _has_ to be unique to YARN? Will this solve the 
problem (in Mesos currently) where when the Executors 
[bootstrap](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L193)
 they do so without security (unless you "bake" the secret and secret config 
into the container image)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-11-06 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19272
  
Hello @vanzin thanks for continuing to help with this. Please take another 
look at this refactor.

In this change, there is one place to interact with 
`hadoopDelegationTokens` from `CoarseGrainedSchedulerBackedn`: 
`initializeHadoopDelegationTokens`. This method contains the logic for 
initializing the tokens and setting a token renewer. It's also now 
resource-manager specific. This seems cleaner than having a 
`HadoopDelegationTokenManager` in `CoarseGrainedSchedulerBackend` because any 
"token management" will always want to wrap`HadoopDelegationTokenManager` so 
you can keep all the necessary information in one place. Of course, happy to 
discuss further. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-05 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148982457
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -58,8 +62,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
 with org.apache.mesos.Scheduler with MesosSchedulerUtils {
 
-  override def hadoopDelegationTokenManager: 
Option[HadoopDelegationTokenManager] =
-Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
+  private lazy val hadoopCredentialRenewer: MesosCredentialRenewer =
--- End diff --

For Mesos the credential renewer contains the tokens, the renewal time, and 
all logic to renew the tokens. Should never be evaluated (and tokens never 
initialized) if `UserGroupInformation.isSecurityEnabled` evaluates to `false`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-05 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148982255
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -686,18 +687,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 true
   }
 
-  protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
--- End diff --

This method was only called once, and would discard the renewal time 
information limiting it's utility.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-11-05 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148982165
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -99,11 +96,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // The num of current max ExecutorId used to re-register appMaster
   @volatile protected var currentExecutorIdCounter = 0
 
-  // hadoop token manager used by some sub-classes (e.g. Mesos)
-  def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = 
None
--- End diff --

No longer needed because resource-manager backends (may) implement their 
own `initializeHadoopDelegationTokens`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-10-31 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19272
  
@vanzin Thanks for the continued review. I addressed the comments you left. 
PTAL.

It seems simplest and more robust to keep the serialized `Credentials` 
within `CoarseGrainedSchedulerBackend`. The change is that now the renewal time 
is also kept along side. I tried to move as much duplicate code to 
`HadoopDelegationTokenManager` which remains a "toolbox" as before and doesn't 
retain much state. The abstract class `HadoopCredentialRenewer` is now part of 
the scheduler backend also. I refrained from changing too much of the YARN 
specific code. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-31 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r148013037
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -233,10 +249,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 context.reply(true)
 
   case RetrieveSparkAppConfig =>
+val tokens = if (renewableDelegationTokens.isDefined) {
+  Some(renewableDelegationTokens.get.credentials)
+} else {
+  None
+}
 val reply = SparkAppConfig(
   sparkProperties,
   SparkEnv.get.securityManager.getIOEncryptionKey(),
-  hadoopDelegationCreds)
+  tokens)
--- End diff --

could make `SparkAppConfig` just take a `RenewableDelegationTokens` object? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-30 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r147866441
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
 ---
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.SparkContext
--- End diff --

`SparkException` is unused, not sure why it was there in the first place


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18098: [SPARK-16944][Mesos] Improve data locality when launchin...

2017-10-30 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/18098
  
@PerilousApricot what do you mean topology?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-27 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r147463877
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -427,10 +441,10 @@ trait MesosSchedulerUtils extends Logging {
   // partition port offers
   val (resourcesWithoutPorts, portResources) = 
filterPortResources(offeredResources)
 
-  val portsAndRoles = requestedPorts.
-map(x => (x, findPortAndGetAssignedRangeRole(x, portResources)))
+  val portsAndResourceInfo = requestedPorts.
--- End diff --

gotcha. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-27 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r147461660
  
--- Diff: dev/deps/spark-deps-hadoop-2.6 ---
@@ -138,7 +138,7 @@ lz4-java-1.4.0.jar
 machinist_2.11-0.6.1.jar
 macro-compat_2.11-1.1.1.jar
 mail-1.4.7.jar
-mesos-1.3.0-shaded-protobuf.jar
+mesos-1.4.0-shaded-protobuf.jar
--- End diff --

The docs say that Spark requires >=1.0 IIRC, so as long as it still _works_ 
with 1.0+ (i.e. we're still sending valid messages) I think it's fine. Maybe we 
should update the docs with any limitations given older versions (assuming 
there are any)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19510: [SPARK-22292][Mesos] Added spark.mem.max support ...

2017-10-26 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19510#discussion_r147121858
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -64,6 +64,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   private val MAX_SLAVE_FAILURES = 2
 
   private val maxCoresOption = 
conf.getOption("spark.cores.max").map(_.toInt)
+  private val maxMemOption = 
conf.getOption("spark.mem.max").map(Utils.memoryStringToMb)
--- End diff --

Do we need to have a check similar to 
https://github.com/apache/spark/blob/06df34d35ec088277445ef09cfb24bfe996f072e/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L73
 but for memory, so that we know we'll "land" on the maximum? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-26 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r147069130
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -427,10 +441,10 @@ trait MesosSchedulerUtils extends Logging {
   // partition port offers
   val (resourcesWithoutPorts, portResources) = 
filterPortResources(offeredResources)
 
-  val portsAndRoles = requestedPorts.
-map(x => (x, findPortAndGetAssignedRangeRole(x, portResources)))
+  val portsAndResourceInfo = requestedPorts.
--- End diff --

Maybe I'm not reading close enough, but what if the requested port isn't 
available or reserved? Can we request another one or do we just decline and 
wait?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-26 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r147067951
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -349,13 +349,22 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   val offerMem = getResource(offer.getResourcesList, "mem")
   val offerCpus = getResource(offer.getResourcesList, "cpus")
   val offerPorts = getRangeResource(offer.getResourcesList, "ports")
+  val offerAllocationInfo = offer.getAllocationInfo
+  val offerReservationInfo = offer
+.getResourcesList
+.asScala
+.find(resource => Option(resource.getReservation).isDefined)
--- End diff --

could you not just use `resource.hasReservation` here instead of defining a 
whole new method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-26 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r147067641
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -451,15 +465,20 @@ trait MesosSchedulerUtils extends Logging {
   }
 
   /** Creates a mesos resource for a specific port number. */
-  private def createResourcesFromPorts(portsAndRoles: List[(Long, 
String)]) : List[Resource] = {
-portsAndRoles.flatMap{ case (port, role) =>
-  createMesosPortResource(List((port, port)), Some(role))}
+  private def createResourcesFromPorts(
+   portsAndResourcesInfo: List[(Long, (String, AllocationInfo, 
Option[ReservationInfo]))])
+: List[Resource] = {
+portsAndResourcesInfo.flatMap{ case (port, rInfo) =>
+  createMesosPortResource(List((port, port)), Option(rInfo._1),
--- End diff --

Maybe a comment here on what exactly is going on? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-26 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r147066762
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -175,17 +176,36 @@ trait MesosSchedulerUtils extends Logging {
 registerLatch.countDown()
   }
 
-  def createResource(name: String, amount: Double, role: Option[String] = 
None): Resource = {
+  private def isNotAnyRole(role: Option[String]): Boolean = {
--- End diff --

I would prefer that this contains all of the logic below (if that's 
possible). For example:

```scala
private def setAllocationAndReservationInfo(allocationInfo: 
Option[AllocationInfo], reservationInfo: Option[ReservationInfo], role: 
Option[String]) = {
  if (role.forall(r => !r.equals(ANY_ROLE)) {
allocationInfo.foreach { alloc => builder.setAllocationInfo(alloc) }
reservationInfo.foreach { res => builder.setReservation(res) }
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19543: [SPARK-19606][MESOS] Support constraints in spark-dispat...

2017-10-25 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19543
  
LGTM. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19543: [SPARK-19606][MESOS] Support constraints in spark-dispat...

2017-10-25 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19543
  
Just one super-nit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19543: [SPARK-19606][MESOS] Support constraints in spark...

2017-10-25 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19543#discussion_r146971307
  
--- Diff: docs/running-on-mesos.md ---
@@ -458,6 +461,14 @@ See the [configuration page](configuration.html) for 
information on Spark config
   
 
 
+  spark.mesos.driver.constraints
+  (none)
+  
+Same as spark.mesos.constraints except applied to drivers 
when launched through the dispatcher. By default,
+all resource offers will be accepted.
--- End diff --

maybe "all resource offers _with sufficient resources_ will be accepted" ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-25 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146855311
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,24 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && currentHadoopDelegationTokens.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  // The renewal time is ignored when getting the initial delegation 
tokens
+  // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), 
so we get the renewal
+  // time here and schedule a thread to renew them.
+  val renewalTime =
--- End diff --

Maybe an overly simplistic solution, but what about putting this 
functionality into `HadoopDelegationTokenManager`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19515: [SPARK-22287][MESOS] SPARK_DAEMON_MEMORY not honored by ...

2017-10-24 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19515
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19555: [SPARK-22133][DOCS] Documentation for Mesos Reject Offer...

2017-10-24 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19555
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-24 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146473582
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -59,8 +59,7 @@ private[yarn] class AMCredentialRenewer(
   private var lastCredentialsFileSuffix = 0
 
   private val credentialRenewer =
-Executors.newSingleThreadScheduledExecutor(
-  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
--- End diff --

I generally agree, but I did this mostly to keep these two classes as 
congruent as possible. I can change it back if you feel strongly. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-24 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146472162
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,24 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && currentHadoopDelegationTokens.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  // The renewal time is ignored when getting the initial delegation 
tokens
+  // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), 
so we get the renewal
+  // time here and schedule a thread to renew them.
+  val renewalTime =
--- End diff --

Ok, I agree. I refrained from touching the current code only because it was 
added it so recently. I assumed the omission of renewal time was intentional 
(or at least semi-intentional). I'll submit a refactor. Are you suggesting that 
we make the renewal thread part of `CoarseGrainedSchedulerBackend` (with the 
appropriate override for YARN/Mesos)?  Or make a resource-manager specific 
method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19515: [SPARK-22287][MESOS] SPARK_DAEMON_MEMORY not honored by ...

2017-10-23 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19515
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19515: [SPARK-22287][MESOS] SPARK_DAEMON_MEMORY not honored by ...

2017-10-23 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19515
  
Yeah that sounds good. Thanks for this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19555: [SPARK-22133][DOCS] Documentation for Mesos Rejec...

2017-10-23 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19555#discussion_r146244212
  
--- Diff: docs/running-on-mesos.md ---
@@ -613,6 +621,41 @@ See the [configuration page](configuration.html) for 
information on Spark config
 driver disconnects, the master immediately tears down the framework.
   
 
+
+  spark.mesos.rejectOfferDuration
+  120s
+  
+The amount of time that the master will reject offer after declining 
--- End diff --

This doesn't sound correct. The mesos.proto 
(https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto#L2310) 
states:
```
Time to consider unused resources refused. Note that all unused
resources will be considered refused and use the default value
(below) regardless of whether Filters was passed to
SchedulerDriver::launchTasks. You MUST pass Filters with this
field set to change this behavior (i.e., get another offer which
includes unused resources sooner or later than the default).
``` 
some simple word-smithing or a link should make it clearer. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19555: [SPARK-22133][DOCS] Documentation for Mesos Rejec...

2017-10-23 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19555#discussion_r146242613
  
--- Diff: docs/running-on-mesos.md ---
@@ -196,17 +196,18 @@ configuration variables:
 
 * Executor memory: `spark.executor.memory`
 * Executor cores: `spark.executor.cores`
-* Number of executors: `spark.cores.max`/`spark.executor.cores`
+* Number of executors: min(`spark.cores.max`/`spark.executor.cores`, 

+`spark.mem.max`/(`spark.executor.memory`+`spark.mesos.executor.memoryOverhead`))
 
 Please see the [Spark Configuration](configuration.html) page for
 details and default values.
 
 Executors are brought up eagerly when the application starts, until
-`spark.cores.max` is reached.  If you don't set `spark.cores.max`, the
-Spark application will reserve all resources offered to it by Mesos,
-so we of course urge you to set this variable in any sort of
-multi-tenant cluster, including one which runs multiple concurrent
-Spark applications.
+`spark.cores.max` or `spark.mem.max` is reached.  If you don't set 
--- End diff --

Could you please add these changes only in 
https://github.com/apache/spark/pull/19510/? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19555: [SPARK-22133][DOCS] Documentation for Mesos Rejec...

2017-10-23 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19555#discussion_r146242931
  
--- Diff: docs/running-on-mesos.md ---
@@ -344,6 +345,13 @@ See the [configuration page](configuration.html) for 
information on Spark config
   
 
 
+  spark.mem.max
--- End diff --

As above, please add this in the separate PR. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19555: [SPARK-22133][DOCS] Documentation for Mesos Rejec...

2017-10-23 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19555#discussion_r146242882
  
--- Diff: docs/running-on-mesos.md ---
@@ -196,17 +196,18 @@ configuration variables:
 
 * Executor memory: `spark.executor.memory`
 * Executor cores: `spark.executor.cores`
-* Number of executors: `spark.cores.max`/`spark.executor.cores`
+* Number of executors: min(`spark.cores.max`/`spark.executor.cores`, 

+`spark.mem.max`/(`spark.executor.memory`+`spark.mesos.executor.memoryOverhead`))
 
 Please see the [Spark Configuration](configuration.html) page for
 details and default values.
 
 Executors are brought up eagerly when the application starts, until
-`spark.cores.max` is reached.  If you don't set `spark.cores.max`, the
-Spark application will reserve all resources offered to it by Mesos,
-so we of course urge you to set this variable in any sort of
-multi-tenant cluster, including one which runs multiple concurrent
-Spark applications.
+`spark.cores.max` or `spark.mem.max` is reached.  If you don't set 
+`spark.cores.max` and `spark.mem.max`, the Spark application will 
+reserve all resources offered to it by Mesos, so we of course urge 
--- End diff --

`reserve` is probably not the correct term to use here. I would use 
`consume`, as Spark does not actually make resource reservations 
http://mesos.apache.org/documentation/latest/reservation/


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19543: [SPARK-19606][MESOS] Support constraints in spark...

2017-10-22 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19543#discussion_r146132173
  
--- Diff: 
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
 ---
@@ -254,6 +254,32 @@ class MesosClusterSchedulerSuite extends SparkFunSuite 
with LocalSparkContext wi
 assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
   }
 
+  test("declines offers that violate driver constraints") {
+setScheduler()
+
+val mem = 1000
+val cpu = 1
+val s2Attributes = List(Utils.createTextAttribute("c", "u"))
+
+val response = scheduler.submitDriver(
+  new MesosDriverDescription("d1", "jar", mem, cpu, true,
+command,
+Map("spark.mesos.executor.home" -> "test",
+  "spark.app.name" -> "test",
+  "spark.mesos.driver.constraints" -> "c:v"),
--- End diff --

maybe test multiple constraints too? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19543: [SPARK-19606][MESOS] Support constraints in spark...

2017-10-22 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19543#discussion_r146132144
  
--- Diff: docs/running-on-mesos.md ---
@@ -458,6 +461,13 @@ See the [configuration page](configuration.html) for 
information on Spark config
   
 
 
+  spark.mesos.driver.constraints
+  (none)
+  
+Same as spark.mesos.constraints except applied to drivers.
--- End diff --

Maybe add the this is only valid when launching jobs with the Mesos 
Dispatcher? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-10-22 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19272
  
Retest this please. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-20 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146094019
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

Ok, you're probably right. It appears that the YARN code uses 
`setContextClassLoader(userClassLoader)` whereas in Mesos does not has a notion 
of `userClassLoader`. Therefore we don't need the separate thread in the Mesos 
code. Do I have this correct? Thanks for showing me this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-19 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r145861062
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

Yes, sorry, for some reason I understood you you to mean the credential 
renewer itself. I added a comment to the same effect as the YARN analogue. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r145301221
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

Yes, I believe so. If you look here 
(https://github.com/apache/spark/blob/6735433cde44b3430fb44edfff58ef8149c66c13/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L271)
 it's the same pattern. The thread needs to run as long as the application 
driver, correct? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r145300847
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
--- End diff --

I also changed `AMCredentialRenewer` to the same. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r145298313
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
--- End diff --

fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r145298003
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -380,7 +389,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   } else {
 declineOffer(
   driver,
-  offer)
+  offer,
--- End diff --

Yeah for now that's fine. I was thinking more specific messages but we can 
address that on a later ticket. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r145294156
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 ---
@@ -173,6 +178,90 @@ private[mesos] object MesosSchedulerBackendUtil 
extends Logging {
 containerInfo
   }
 
+  private def getSecrets(conf: SparkConf, secretConfig: 
MesosSecretConfig): Seq[Secret] = {
+def createValueSecret(data: String): Secret = {
+  Secret.newBuilder()
+.setType(Secret.Type.VALUE)
+
.setValue(Secret.Value.newBuilder().setData(ByteString.copyFrom(data.getBytes)))
+.build()
+}
+
+def createReferenceSecret(name: String): Secret = {
+  Secret.newBuilder()
+.setReference(Secret.Reference.newBuilder().setName(name))
+.setType(Secret.Type.REFERENCE)
+.build()
+}
+
+val referenceSecrets: Seq[Secret] =
+  conf.get(secretConfig.SECRET_NAMES).getOrElse(Nil).map(s => 
createReferenceSecret(s))
+
+val valueSecrets: Seq[Secret] = {
+  conf.get(secretConfig.SECRET_VALUES).getOrElse(Nil).map(s => 
createValueSecret(s))
+}
+
+if (valueSecrets.nonEmpty && referenceSecrets.nonEmpty) {
+  throw new SparkException("Cannot specify VALUE type secrets and 
REFERENCE types ones")
+}
+
+if (referenceSecrets.nonEmpty) referenceSecrets else valueSecrets
+  }
+
+  private def illegalSecretInput(dest: Seq[String], secrets: Seq[Secret]): 
Boolean = {
+if (dest.nonEmpty) {
+  // make sure there is a one-to-one correspondence between 
destinations and secrets
+  if (dest.length != secrets.length) {
+return true
+  }
+}
+false
+  }
+
+  def getSecretVolume(conf: SparkConf, secretConfig: MesosSecretConfig): 
List[Volume] = {
+val secrets = getSecrets(conf, secretConfig)
+val secretPaths: Seq[String] =
+  conf.get(secretConfig.SECRET_FILENAMES).getOrElse(Nil)
+
+if (illegalSecretInput(secretPaths, secrets)) {
+  throw new SparkException(
+s"Need to give equal numbers of secrets and file paths for 
file-based " +
+  s"reference secrets got secrets $secrets, and paths 
$secretPaths")
+}
+
+secrets.zip(secretPaths).map {
+  case (s, p) =>
+val source = Volume.Source.newBuilder()
+  .setType(Volume.Source.Type.SECRET)
+  .setSecret(s)
+Volume.newBuilder()
+  .setContainerPath(p)
+  .setSource(source)
+  .setMode(Volume.Mode.RO)
+  .build
+}.toList
+  }
+
+  def getSecretEnvVar(conf: SparkConf, secretConfig: MesosSecretConfig):
+  List[Variable] = {
--- End diff --

indentation? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r145294352
  
--- Diff: docs/running-on-mesos.md ---
@@ -501,23 +503,74 @@ See the [configuration page](configuration.html) for 
information on Spark config
 spark.mesos.driver.secret.names or spark.mesos.driver.secret.values 
will be
 written to the provided file. Paths are relative to the container's 
work
 directory.  Absolute paths must already exist.  Consult the Mesos 
Secret
-protobuf for more information.
+protobuf for more information. Example:
--- End diff --

I'm not sure what the policy should be on this, but IIUC file-based secrets 
does require a backend module. Should we mention that here? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r145294074
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 ---
@@ -17,10 +17,14 @@
 
 package org.apache.spark.scheduler.cluster.mesos
--- End diff --

Ah ok. One of these days let's make a "clean up" JIRA and harmonize all of 
this code. The naming is also all over the place.. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r145294610
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 ---
@@ -122,7 +126,8 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
 .toList
   }
 
-  def containerInfo(conf: SparkConf): ContainerInfo.Builder = {
+  def buildContainerInfo(conf: SparkConf):
+  ContainerInfo.Builder = {
--- End diff --

indentation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19515: [SPARK-22287][MESOS] SPARK_DAEMON_MEMORY not honored by ...

2017-10-17 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19515
  
Hello @pmackles, thanks for this. It would be set to the value of 
`SPARK_DRIVER_MEMORY` by default correct? What to you think about introducing a 
new envvar (`SPARK_DISPATCHER_MEMORY`) so that it can be individually 
configured? Only motivated by the fact that the Dispatcher is a pretty unique 
beast compared to the other Spark daemons. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-17 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r145292254
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -135,22 +135,24 @@ private[spark] class MesosClusterScheduler(
   private val useFetchCache = 
conf.getBoolean("spark.mesos.fetchCache.enable", false)
   private val schedulerState = engineFactory.createEngine("scheduler")
   private val stateLock = new Object()
+  // Keyed by submission id
   private val finishedDrivers =
 new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
   private var frameworkId: String = null
-  // Holds all the launched drivers and current launch state, keyed by 
driver id.
+  // Holds all the launched drivers and current launch state, keyed by 
submission id.
   private val launchedDrivers = new mutable.HashMap[String, 
MesosClusterSubmissionState]()
   // Holds a map of driver id to expected slave id that is passed to Mesos 
for reconciliation.
   // All drivers that are loaded after failover are added here, as we need 
get the latest
-  // state of the tasks from Mesos.
+  // state of the tasks from Mesos. Keyed by task Id.
   private val pendingRecover = new mutable.HashMap[String, SlaveID]()
-  // Stores all the submitted drivers that hasn't been launched.
+  // Stores all the submitted drivers that hasn't been launched, keyed by 
submission id
   private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
-  // All supervised drivers that are waiting to retry after termination.
+  // All supervised drivers that are waiting to retry after termination, 
keyed by submission id
   private val pendingRetryDrivers = new 
ArrayBuffer[MesosDriverDescription]()
   private val queuedDriversState = 
engineFactory.createEngine("driverQueue")
   private val launchedDriversState = 
engineFactory.createEngine("launchedDrivers")
   private val pendingRetryDriversState = 
engineFactory.createEngine("retryList")
+  private final val RETRY_ID = "-retry-"
--- End diff --

Sorry, super nit, maybe `RETRY_SEP`. Feel free to ignore. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19374: [SPARK-22145][MESOS] fix supervise with checkpointing on...

2017-10-17 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19374
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-16 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r145010250
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -469,6 +474,12 @@ trait MesosSchedulerUtils extends Logging {
 .setType(Value.Type.RANGES)
 .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
   role.foreach(r => builder.setRole(r))
+
+  if (role.forall(r => !r.equals("*"))) {
--- End diff --

Oh, I thought you could put the whole block into a function, that way if we 
need to change the way we do the protos some day it's easy to change in one 
place. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-16 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r145009696
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -380,7 +389,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   } else {
 declineOffer(
   driver,
-  offer)
+  offer,
--- End diff --

I've found the `reason` to give less information then desired.. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18784: [SPARK-21559][Mesos] remove mesos fine-grained mode

2017-10-15 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/18784
  
@jiangxb1987 yes. I'll work to review this ASAP. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-10-13 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19272
  
@vanzin Thanks for the review. I'll address the comments ASAP. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r144682661
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -896,8 +913,8 @@ private[spark] class MesosClusterScheduler(
 revive()
   }
 
-  private def addDriverToPending(desc: MesosDriverDescription, taskId: 
String) = {
-pendingRetryDriversState.persist(taskId, desc)
+  private def addDriverToPending(desc: MesosDriverDescription, id: String) 
= {
+pendingRetryDriversState.persist(id, desc)
--- End diff --

Maybe keep the name as `subId` because it could be confusing otherwise.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r144682434
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -373,10 +374,16 @@ class SparkContext(config: SparkConf) extends Logging 
{
 // log out spark.app.name in the Spark driver logs
 logInfo(s"Submitted application: $appName")
 
-// System property spark.yarn.app.id must be set if user code ran by 
AM on a YARN cluster
-if (master == "yarn" && deployMode == "cluster" && 
!_conf.contains("spark.yarn.app.id")) {
-  throw new SparkException("Detected yarn cluster mode, but isn't 
running on a cluster. " +
-"Deployment to YARN is not supported directly by SparkContext. 
Please use spark-submit.")
+// System property spark.yarn.app.id must be set if user code ran by 
AM on a YARN cluster or
+// System property spark.mesos.driver.frameworkId must be set if user 
code ran by
+// Mesos Dispatcher on a MESOS cluster
+if (deployMode == "cluster") {
--- End diff --

FWIW, I _believe_ that when we submit a job with the dispatcher 
`deployMode` is actually set to `client`, so this logic may not be invoked as 
expected. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19374: [SPARK-22145][MESOS] fix supervise with checkpoin...

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19374#discussion_r144682554
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 ---
@@ -374,6 +375,15 @@ private[spark] class MesosClusterScheduler(
 s"${frameworkId}-${desc.submissionId}${retries}"
   }
 
+  private def getDriverTaskId(desc: MesosDriverDescription): String = {
+val sId = desc.submissionId
+desc.retryState.map(state => sId + 
s"-retry-${state.retries.toString}").getOrElse(sId)
+  }
+
+  private def getSumbmissionIdFromTaskId(taskId: String): String = {
+taskId.split("-retry-").head
--- End diff --

Maybe make this a constant? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r144682189
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -28,6 +28,8 @@ import com.google.common.base.Splitter
 import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, 
SchedulerDriver}
 import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
 import org.apache.mesos.Protos.FrameworkInfo.Capability
+import org.apache.mesos.Protos.Resource.AllocationInfo
--- End diff --

maybe just use `Protos` imported above? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r144682211
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -175,14 +175,22 @@ trait MesosSchedulerUtils extends Logging {
 registerLatch.countDown()
   }
 
-  def createResource(name: String, amount: Double, role: Option[String] = 
None): Resource = {
+  def createResource(
+   name: String,
+   amount: Double,
+   role: Option[String] = None,
+   allocationInfo: Option[AllocationInfo] = None,
+   reservationInfo: Option[ReservationInfo] = None): Resource = {
 val builder = Resource.newBuilder()
   .setName(name)
   .setType(Value.Type.SCALAR)
   .setScalar(Value.Scalar.newBuilder().setValue(amount).build())
 
 role.foreach { r => builder.setRole(r) }
-
+if (role.forall(r => !r.equals("*"))) {
--- End diff --

Make a constant for the star role (`*`) called `ANY_ROLE`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r144682163
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -380,7 +389,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   } else {
 declineOffer(
   driver,
-  offer)
+  offer,
--- End diff --

As an aside, what do you think about adding more detailed logging of the 
reason why offers are declined? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19390#discussion_r144682379
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -469,6 +474,12 @@ trait MesosSchedulerUtils extends Logging {
 .setType(Value.Type.RANGES)
 .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
   role.foreach(r => builder.setRole(r))
+
+  if (role.forall(r => !r.equals("*"))) {
--- End diff --

Maybe abstract this logic into a general function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r144682312
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 ---
@@ -17,10 +17,14 @@
 
 package org.apache.spark.scheduler.cluster.mesos
--- End diff --

Out of curiosity, why do we have this file _and_ `MesosSchedulerUtils`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r144681733
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 ---
@@ -170,9 +175,119 @@ private[mesos] object MesosSchedulerBackendUtil 
extends Logging {
   containerInfo.addNetworkInfos(info)
 }
 
+getSecretVolume(conf, secretConfig).foreach { volume =>
+  if (volume.getSource.getSecret.getReference.isInitialized) {
+logInfo(s"Setting reference secret 
${volume.getSource.getSecret.getReference.getName}" +
+  s"on file ${volume.getContainerPath}")
+  } else {
+logInfo(s"Setting secret on file name=${volume.getContainerPath}")
+  }
+  containerInfo.addVolumes(volume)
+}
+
 containerInfo
   }
 
+  def addSecretEnvVar(
--- End diff --

Is it possible to make this return `List[Variable]` like it used to as 
opposed to mutating the `Environment.Builder`, just more consistent (e.g. 
`getSecretVolume`)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r144680213
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 ---
@@ -159,7 +160,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   .setCommand(command)
   .setData(ByteString.copyFrom(createExecArg()))
 
-
executorInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
+executorInfo.setContainer(
--- End diff --

I'd almost prefer that we don't add any features to fine-grained right now. 
As we have virtually no test coverage on whether or not this will work. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r144680478
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 ---
@@ -170,9 +175,119 @@ private[mesos] object MesosSchedulerBackendUtil 
extends Logging {
   containerInfo.addNetworkInfos(info)
 }
 
+getSecretVolume(conf, secretConfig).foreach { volume =>
+  if (volume.getSource.getSecret.getReference.isInitialized) {
+logInfo(s"Setting reference secret 
${volume.getSource.getSecret.getReference.getName}" +
--- End diff --

Need a space at the end of this log line (my bad!)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r144680608
  
--- Diff: docs/running-on-mesos.md ---
@@ -522,6 +522,43 @@ See the [configuration page](configuration.html) for 
information on Spark config
 
 
 
+  spark.mesos.executor.secret.envkeys
+  (none)
+  
+A comma-separated list that, if set, the contents of the secret 
referenced
--- End diff --

What do you think about putting an example here like we do for 
`spark.mesos.network.labels` - something general for all secrets? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r144681758
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 ---
@@ -170,9 +175,119 @@ private[mesos] object MesosSchedulerBackendUtil 
extends Logging {
   containerInfo.addNetworkInfos(info)
 }
 
+getSecretVolume(conf, secretConfig).foreach { volume =>
+  if (volume.getSource.getSecret.getReference.isInitialized) {
+logInfo(s"Setting reference secret 
${volume.getSource.getSecret.getReference.getName}" +
+  s"on file ${volume.getContainerPath}")
+  } else {
+logInfo(s"Setting secret on file name=${volume.getContainerPath}")
+  }
+  containerInfo.addVolumes(volume)
+}
+
 containerInfo
   }
 
+  def addSecretEnvVar(
+  envBuilder: Environment.Builder,
+  conf: SparkConf,
+  secretConfig: MesosSecretConfig): Unit = {
+getSecretEnvVar(conf, secretConfig).foreach { variable =>
+  if (variable.getSecret.getReference.isInitialized) {
+logInfo(s"Setting reference secret 
${variable.getSecret.getReference.getName}" +
+  s"on file ${variable.getName}")
+  } else {
+logInfo(s"Setting secret on environment variable 
name=${variable.getName}")
+  }
+  envBuilder.addVariables(variable)
+}
+  }
+
+  private def getSecrets(conf: SparkConf, secretConfig: MesosSecretConfig):
+  Seq[Secret] = {
--- End diff --

Indentation? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r144680448
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 ---
@@ -122,7 +126,8 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
 .toList
   }
 
-  def containerInfo(conf: SparkConf): ContainerInfo.Builder = {
+  def buildContainerInfo(conf: SparkConf, secretConfig: MesosSecretConfig):
+  ContainerInfo.Builder = {
 val containerType = if 
(conf.contains("spark.mesos.executor.docker.image") &&
--- End diff --

Should probably have a check here for if secrets are present, because I 
don't think that secrets will work if you're _not_ also using the Mesos 
containerizer. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r144680489
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 ---
@@ -170,9 +175,119 @@ private[mesos] object MesosSchedulerBackendUtil 
extends Logging {
   containerInfo.addNetworkInfos(info)
 }
 
+getSecretVolume(conf, secretConfig).foreach { volume =>
+  if (volume.getSource.getSecret.getReference.isInitialized) {
+logInfo(s"Setting reference secret 
${volume.getSource.getSecret.getReference.getName}" +
+  s"on file ${volume.getContainerPath}")
+  } else {
+logInfo(s"Setting secret on file name=${volume.getContainerPath}")
+  }
+  containerInfo.addVolumes(volume)
+}
+
 containerInfo
   }
 
+  def addSecretEnvVar(
+  envBuilder: Environment.Builder,
+  conf: SparkConf,
+  secretConfig: MesosSecretConfig): Unit = {
+getSecretEnvVar(conf, secretConfig).foreach { variable =>
+  if (variable.getSecret.getReference.isInitialized) {
+logInfo(s"Setting reference secret 
${variable.getSecret.getReference.getName}" +
--- End diff --

Space at the end of this log line too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-13 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19437#discussion_r144680353
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 ---
@@ -122,7 +126,8 @@ private[mesos] object MesosSchedulerBackendUtil extends 
Logging {
 .toList
   }
 
-  def containerInfo(conf: SparkConf): ContainerInfo.Builder = {
+  def buildContainerInfo(conf: SparkConf, secretConfig: MesosSecretConfig):
--- End diff --

Maybe change `secretConfig` to `mesosConfig` and pass the whole thing? That 
way if we want to add new functionality later this function is more general. 
Given that most of what we do is proto-generation, I bet we'll have to do this 
eventually anyways. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19437: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-09 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19437
  
@susanxhuynh @skonto The secret-containing protos will be valid in Mesos 
1.3 onwards, thus why the scheduler has that requirement. DC/OS with file-based 
secrets has Mesos 1.4 thus why we test it there. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19272: [Spark-21842][Mesos] Support Kerberos ticket renewal and...

2017-10-04 Thread ArtRand
Github user ArtRand commented on the issue:

https://github.com/apache/spark/pull/19272
  
@kalvinnchau I'm running Hadoop 2.6 on a DC/OS cluster with Mesos 1.4.0 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >