[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179174442
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/CacheRecoveryIntegrationSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.util.Try
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage._
+
+/**
+ * This is an integration test for the cache recovery feature using a 
local spark cluster. It
+ * extends the unit tests in CacheRecoveryManagerSuite which mocks a lot 
of cluster infrastructure.
+ */
+class CacheRecoveryIntegrationSuite extends SparkFunSuite
+with Matchers
+with BeforeAndAfterEach
+with BeforeAndAfterAll
+with Eventually {
+
+  private var conf: SparkConf = makeBaseConf()
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, 
"shuffle", numUsableCores = 4)
+  private val rpcHandler = new ExternalShuffleBlockHandler(transportConf, 
null)
+  private val transportContext = new TransportContext(transportConf, 
rpcHandler)
+  private val shuffleService = transportContext.createServer()
+  private var sc: SparkContext = _
+
+  private def makeBaseConf() = new SparkConf()
+.setAppName("test")
+.setMaster("local-cluster[4, 1, 512]")
+.set("spark.dynamicAllocation.enabled", "true")
+.set("spark.dynamicAllocation.executorIdleTimeout", "1s") // always
+.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "1s")
+.set(EXECUTOR_MEMORY.key, "512m")
+.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "500s")
+.set(EXECUTOR_INSTANCES.key, "1")
+.set(DYN_ALLOCATION_INITIAL_EXECUTORS.key, "4")
+.set(DYN_ALLOCATION_MIN_EXECUTORS.key, "3")
+
+  override def beforeEach(): Unit = {
+conf = makeBaseConf()
+conf.set("spark.shuffle.service.port", shuffleService.getPort.toString)
+  }
+
+  override def afterEach(): Unit = {
+sc.stop()
+conf = null
+  }
+
+  override def afterAll(): Unit = {
+shuffleService.close()
+  }
+
+  private def getLocations(
+  sc: SparkContext,
+  rdd: RDD[_]): Map[BlockId, Map[BlockManagerId, BlockStatus]] = {
+import scala.collection.breakOut
+val blockIds: Array[BlockId] = rdd.partitions.map(p => 
RDDBlockId(rdd.id, p.index))
+blockIds.map { id =>
+  id -> 
Try(sc.env.blockManager.master.getBlockStatus(id)).getOrElse(Map.empty)
+}(breakOut)
+  }
+
+  test("cached data is replicated before dynamic de-allocation") {
+sc = new SparkContext(conf)
+TestUtils.waitUntilExecutorsUp(sc, 4, 6)
+
+val rdd = sc.parallelize(1 to 1000, 4).map(_ * 4).cache()
+rdd.reduce(_ + _) shouldBe 2002000
+sc.getExecutorIds().size shouldBe 4
+getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty } 
shouldBe true
+
+eventually(timeout(Span(5, Seconds)), interval(Span(1, Seconds))) {
+  sc.getExecutorIds().size shouldBe 3
+  getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179191524
  
--- Diff: 
core/src/test/scala/org/apache/spark/CacheRecoveryManagerSuite.scala ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.{Future, Promise}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark.CacheRecoveryManager._
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.rpc._
+import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId}
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+class CacheRecoveryManagerSuite
+  extends SparkFunSuite with MockitoSugar with Matchers {
+
+  val oneGB: Long = ByteUnit.GiB.toBytes(1).toLong
+
+  val plentyOfMem = Map(
+BlockManagerId("1", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("2", "host", 12, None) -> ((oneGB, oneGB)),
+BlockManagerId("3", "host", 12, None) -> ((oneGB, oneGB)))
+
+  test("replicate blocks until empty and then kill executor") {
+val conf = new SparkConf()
+val eam = mock[ExecutorAllocationManager]
+val blocks = Seq(RDDBlockId(1, 1), RDDBlockId(2, 1))
+val bmme = FakeBMM(blocks.iterator, plentyOfMem)
+val bmmeRef = DummyRef(bmme)
+val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf)
+when(eam.killExecutors(Seq("1"))).thenReturn(Seq("1"))
+
+try {
+  val future = cacheRecoveryManager.startCacheRecovery(Seq("1"))
+  val results = ThreadUtils.awaitResult(future, Duration(3, 
TimeUnit.SECONDS))
+  results.head shouldBe DoneRecovering
--- End diff --

rather than just checking head, you can check the whole result as easily.  
and also here, prefer assert over shouldBe

```
assert(results === Seq(DoneRecovering))
```

throughout this file


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r179179616
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/CacheRecoveryIntegrationSuite.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.util.Try
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage._
+
+/**
+ * This is an integration test for the cache recovery feature using a 
local spark cluster. It
+ * extends the unit tests in CacheRecoveryManagerSuite which mocks a lot 
of cluster infrastructure.
+ */
+class CacheRecoveryIntegrationSuite extends SparkFunSuite
+with Matchers
+with BeforeAndAfterEach
+with BeforeAndAfterAll
+with Eventually {
+
+  private var conf: SparkConf = makeBaseConf()
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, 
"shuffle", numUsableCores = 4)
+  private val rpcHandler = new ExternalShuffleBlockHandler(transportConf, 
null)
+  private val transportContext = new TransportContext(transportConf, 
rpcHandler)
+  private val shuffleService = transportContext.createServer()
+  private var sc: SparkContext = _
+
+  private def makeBaseConf() = new SparkConf()
+.setAppName("test")
+.setMaster("local-cluster[4, 1, 512]")
+.set("spark.dynamicAllocation.enabled", "true")
+.set("spark.dynamicAllocation.executorIdleTimeout", "1s") // always
+.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "1s")
+.set(EXECUTOR_MEMORY.key, "512m")
+.set(SHUFFLE_SERVICE_ENABLED.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY.key, "true")
+.set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "500s")
+.set(EXECUTOR_INSTANCES.key, "1")
+.set(DYN_ALLOCATION_INITIAL_EXECUTORS.key, "4")
+.set(DYN_ALLOCATION_MIN_EXECUTORS.key, "3")
+
+  override def beforeEach(): Unit = {
+conf = makeBaseConf()
+conf.set("spark.shuffle.service.port", shuffleService.getPort.toString)
+  }
+
+  override def afterEach(): Unit = {
+sc.stop()
+conf = null
+  }
+
+  override def afterAll(): Unit = {
+shuffleService.close()
+  }
+
+  private def getLocations(
+  sc: SparkContext,
+  rdd: RDD[_]): Map[BlockId, Map[BlockManagerId, BlockStatus]] = {
+import scala.collection.breakOut
+val blockIds: Array[BlockId] = rdd.partitions.map(p => 
RDDBlockId(rdd.id, p.index))
+blockIds.map { id =>
+  id -> 
Try(sc.env.blockManager.master.getBlockStatus(id)).getOrElse(Map.empty)
+}(breakOut)
+  }
+
+  test("cached data is replicated before dynamic de-allocation") {
+sc = new SparkContext(conf)
+TestUtils.waitUntilExecutorsUp(sc, 4, 6)
+
+val rdd = sc.parallelize(1 to 1000, 4).map(_ * 4).cache()
+rdd.reduce(_ + _) shouldBe 2002000
+sc.getExecutorIds().size shouldBe 4
+getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty } 
shouldBe true
+
+eventually(timeout(Span(5, Seconds)), interval(Span(1, Seconds))) {
+  sc.getExecutorIds().size shouldBe 3
+  getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty

[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20640#discussion_r179013270
  
--- Diff: 
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 ---
@@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 verifyTaskLaunched(driver, "o2")
   }
 
+  test("mesos declines offers from blacklisted slave") {
+setBackend()
+
+// launches a task on a valid offer on slave s1
+val minMem = backend.executorMemory(sc) + 1024
+val minCpu = 4
+val offer1 = Resources(minMem, minCpu)
+offerResources(List(offer1))
+verifyTaskLaunched(driver, "o1")
+
+// for any reason executor(aka mesos task) failed on s1
+val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
+backend.statusUpdate(driver, status)
+when(taskScheduler.nodeBlacklist()).thenReturn(Set("hosts1"))
--- End diff --

just to re-iterate my point above -- in many cases, having an executor fail 
will *not* lead to `taskScheduler.nodeBlacklist()` changing as you're doing 
here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20640#discussion_r179012299
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -648,14 +645,8 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   totalGpusAcquired -= gpus
   gpusByTaskId -= taskId
 }
-// If it was a failure, mark the slave as failed for blacklisting 
purposes
 if (TaskState.isFailed(state)) {
-  slave.taskFailures += 1
-
-  if (slave.taskFailures >= MAX_SLAVE_FAILURES) {
-logInfo(s"Blacklisting Mesos slave $slaveId due to too many 
failures; " +
-"is Spark installed on it?")
-  }
+  logError(s"Task $taskId failed on Mesos slave $slaveId.")
--- End diff --

minor: I think it would be nice to say "Mesos task $taskId...".  Maybe its 
obvious for those spending more time with mesos, but I find I'm easily confused 
by the difference between a mesos task and a spark task.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20640#discussion_r179012891
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -571,7 +568,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   cpus + totalCoresAcquired <= maxCores &&
   mem <= offerMem &&
   numExecutors < executorLimit &&
-  slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < 
MAX_SLAVE_FAILURES &&
+  !scheduler.nodeBlacklist().contains(offerHostname) &&
--- End diff --

I just want to make really sure everybody understands the big change in 
behavior here -- `nodeBlacklist()` currently *only* gets updated based on 
failures in *spark* tasks.  If a mesos task fails to even start -- that is, if 
a spark executor fails to launch on a node -- `nodeBlacklist` does not get 
updated.  So you could have a node that is misconfigured somehow, and you might 
end up repeatedly trying to launch executors on it after this changed, with the 
executor even failing to start each time.  That is even if you have 
blacklisting on.

This is SPARK-16630 for the non-mesos case.  That is being actively worked 
on now -- however the work there will probably have to be yarn-specific, so 
there will still be followup work to get the same thing for mesos after that is 
in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178967925
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Failure
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, 
Timeout}
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executor's cached 
blocks, and then shutting
+ * it down.
+ */
+private class CacheRecoveryManager(
+blockManagerMasterEndpoint: RpcEndpointRef,
+executorAllocationManager: ExecutorAllocationManager,
+conf: SparkConf)
+  extends Logging {
+
+  private val forceKillAfterS = 
conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
+  private implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(threadPool)
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
+  private val recoveringExecutors = CacheBuilder.newBuilder()
+.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS)
+.build[String, String]()
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   * @return a sequence of futures of Unit that will complete once the 
executor has been killed.
+   */
+  def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+val canBeRecovered: Future[Seq[String]] = checkMem(execIds)
+
+canBeRecovered.flatMap { execIds =>
+  execIds.foreach { execId => recoveringExecutors.put(execId, execId) }
+  Future.sequence(execIds.map { replicateUntilTimeoutThenKill })
+}
+  }
+
+  def replicateUntilTimeoutThenKill(execId: String): Future[KillReason] = {
+val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS)
+val replicationFuture = replicateUntilDone(execId)
+
+Future.firstCompletedOf(List(timeoutFuture, 
replicationFuture)).andThen {
+  case scala.util.Success(DoneRecovering) =>
+logTrace(s"Done recovering blocks on $execId, killing now")
+  case scala.util.Success(Timeout) =>
+logWarning(s"Couldn't recover cache on $execId before 
$forceKillAfterS second timeout")
+  case Failure(ex) =>
+logWarning(s"Error recovering cache on $execId", ex)
+}.andThen {
+  case _ =>
+kill(execId)
+}
+  }
+
+  /**
+   * Given a list of executors that will be shut down, check if there is 
enough free memory on the
+   * rest of the cluster to hold their data. Return a list of just the 
executors for which there
+   * will be enough space. Executors are included smallest first.
+   *
+   * This is a best guess implementation and it is not guaranteed that all 
returned executors
+   * will succeed. For example a block might be too big to fit on any one 
specific executor.
+   *
+   * @param execIds executors which will be shut down
+   * @

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178966943
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Failure
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, 
Timeout}
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executor's cached 
blocks, and then shutting
+ * it down.
+ */
+private class CacheRecoveryManager(
+blockManagerMasterEndpoint: RpcEndpointRef,
+executorAllocationManager: ExecutorAllocationManager,
+conf: SparkConf)
+  extends Logging {
+
+  private val forceKillAfterS = 
conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
+  private implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(threadPool)
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
+  private val recoveringExecutors = CacheBuilder.newBuilder()
+.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS)
+.build[String, String]()
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   * @return a sequence of futures of Unit that will complete once the 
executor has been killed.
+   */
+  def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+val canBeRecovered: Future[Seq[String]] = checkMem(execIds)
--- End diff --

shouldn't you immediately kill those executors which dont' pass `checkMem`? 
 are they getting killed somewhere else?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178967087
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Failure
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, 
Timeout}
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executor's cached 
blocks, and then shutting
+ * it down.
+ */
+private class CacheRecoveryManager(
+blockManagerMasterEndpoint: RpcEndpointRef,
+executorAllocationManager: ExecutorAllocationManager,
+conf: SparkConf)
+  extends Logging {
+
+  private val forceKillAfterS = 
conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
+  private implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(threadPool)
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
+  private val recoveringExecutors = CacheBuilder.newBuilder()
+.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS)
+.build[String, String]()
+
+  /**
+   * Start the recover cache shutdown process for these executors
+   *
+   * @param execIds the executors to start shutting down
+   * @return a sequence of futures of Unit that will complete once the 
executor has been killed.
+   */
+  def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = {
+logDebug(s"Recover cached data before shutting down executors 
${execIds.mkString(", ")}.")
+val canBeRecovered: Future[Seq[String]] = checkMem(execIds)
+
+canBeRecovered.flatMap { execIds =>
+  execIds.foreach { execId => recoveringExecutors.put(execId, execId) }
+  Future.sequence(execIds.map { replicateUntilTimeoutThenKill })
+}
+  }
+
+  def replicateUntilTimeoutThenKill(execId: String): Future[KillReason] = {
+val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS)
+val replicationFuture = replicateUntilDone(execId)
+
+Future.firstCompletedOf(List(timeoutFuture, 
replicationFuture)).andThen {
+  case scala.util.Success(DoneRecovering) =>
+logTrace(s"Done recovering blocks on $execId, killing now")
+  case scala.util.Success(Timeout) =>
+logWarning(s"Couldn't recover cache on $execId before 
$forceKillAfterS second timeout")
+  case Failure(ex) =>
+logWarning(s"Error recovering cache on $execId", ex)
+}.andThen {
+  case _ =>
+kill(execId)
+}
+  }
+
+  /**
+   * Given a list of executors that will be shut down, check if there is 
enough free memory on the
+   * rest of the cluster to hold their data. Return a list of just the 
executors for which there
+   * will be enough space. Executors are included smallest first.
+   *
+   * This is a best guess implementation and it is not guaranteed that all 
returned executors
+   * will succeed. For example a block might be too big to fit on any one 
specific executor.
+   *
+   * @param execIds executors which will be shut down
+   * @

[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178964472
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -250,6 +255,44 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+  // As a heuristic, prioritize replicating the latest rdd. If this 
succeeds,
+  // CacheRecoveryManager will try to replicate the remaining rdds.
--- End diff --

rather than the latest rdd, it would actually make more sense to take 
advantage of the LRU already in the MemoryStore: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L91

but maybe that is not easy to expose.

But I think that also means that the *receiving* end will put the 
replicated block at the back of that LinkedHashMap, even though it really 
hasn't been accessed at all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178968393
  
--- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala 
---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.Failure
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, 
Timeout}
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Responsible for asynchronously replicating all of an executor's cached 
blocks, and then shutting
+ * it down.
+ */
+private class CacheRecoveryManager(
+blockManagerMasterEndpoint: RpcEndpointRef,
+executorAllocationManager: ExecutorAllocationManager,
+conf: SparkConf)
+  extends Logging {
+
+  private val forceKillAfterS = 
conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT)
+  private val threadPool = 
ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool")
+  private implicit val asyncExecutionContext = 
ExecutionContext.fromExecutorService(threadPool)
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers")
+  private val recoveringExecutors = CacheBuilder.newBuilder()
--- End diff --

I find `recoveringExecutors` pretty confusing, I think its executors that 
are recovering from some problem, but are going to be OK -- not executors that 
are about to die, which we are recovering data from.  how about 
`drainingExecutors`?  (though I have a feeling this name may have been 
discussed in earlier rounds of comments and this is what we settled on ... if 
so, thats fine.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-04-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r178959007
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
--- End diff --

sorry my comment was worded very poorly.  Is there any reason you wouldn't 
want to transfer the on-disk blocks?  I assume you'd want to replicate all of 
them, and just needed to change the comments elsewhere.  Intentional tradeoff, 
as users are more likely to limit the amount of in-memory caching to only the 
most important stuff?

Also just to be really precise -- the check below isn't whether the block 
is in-memory currently, its whether its been requested to cache in memory.  It 
may have been cached as MEMORY_AND_DISK, but currently only resides on disk.  
Depending on why you want to limit to in-memory only, this may not be 
applicable.  Maybe you actually want `!useDisk`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...

2018-03-23 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20888
  
ah ok, yes when run in isolation, the stage will be 0, so your change makes 
sense.  But that is not what is making it flaky in a full test run


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...

2018-03-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20888
  
> if I execute the test on my machine alone it never pass.

you mean it never fails on your machine, right?  its only flaky when you 
run everything on jenkins?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...

2018-03-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20888
  
hmm you're right, I was looking at a different branch in my editor and 
didn't pay attention that it was reset in the code I linked to on master, oops.

I still dont' understand your proposed solution, though -- how is checking 
`stageToKill != -1` better than checking `stageToKill > 0` in this case?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...

2018-03-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20888
  
I think you're right about killing the wrong stage, but I don't think its 
exactly what you've outlined.  The original code doesn't try to kill a stage 
with ID == 0 -- instead its just waiting until that volatile is set to 
something > 0, and then proceeds.  that seems to work fine, we do see that the 
stage does get canceled OK once.

However, I think the problem is because the test [runs twice, with and 
without 
codegen](https://github.com/apache/spark/blob/4d37008c78d7d6b8f8a649b375ecc090700eca4f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala#L165).
  The first time, it'll always wait to till the stage Id is set, because of 
that `eventually { ... stageToKill > 0}`.

however, on the second iteration, that `stageToKill` may still be > 0 based 
on the first iteration, not because its been set by the second iteration.  So I 
think you just need to reset the value to -1 between iterations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-03-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20604
  
@vanzin @sitalkedia @jiangxb1987 I was looking at this code again, and I'd 
appreciate your thoughts on how this relates to 
[SPARK-21834](https://issues.apache.org/jira/browse/SPARK-21834)  
https://github.com/apache/spark/pull/19081

I actually think that SPARK-21834 probably solves the bug I was describing 
initially.  I hit the bug on 2.2.0, and didn't properly understand the change 
of SPARK-21834 when proposing this change.  Nonetheless, I still think this fix 
is a good one -- it improves code clarity in general and fixes a couple other 
minor cases.  I'd also link the issues in jira etc. so the relationship is more 
clear.

I'd go even further and suggest that with this fix in, we can actually 
remove SPARK-21834, as its no longer necessary.  its not harmful, but its just 
confusing.

thoughts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

2018-03-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20657
  
@jerryshao I know you said you wanted to take a deeper look, but its been a 
while.  otherwise I'll merge in the next day or two


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20853: [SPARK-23729][SS] Glob resolution is done without the fr...

2018-03-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20853
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20770: [SPARK-23626][CORE] DAGScheduler blocked due to JobSubmi...

2018-03-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20770
  
took a quick look, agree with shivaram's observations, you've got to handle 
`shuffleIdToMapStage` which will not be so easy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20742: [SPARK-23572][docs] Bring "security.md" up to date.

2018-03-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20742
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20742: [SPARK-23572][docs] Bring "security.md" up to dat...

2018-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20742#discussion_r175171592
  
--- Diff: docs/security.md ---
@@ -3,47 +3,291 @@ layout: global
 displayTitle: Spark Security
 title: Security
 ---
+* This will become a table of contents (this text will be scraped).
+{:toc}
 
-Spark currently supports authentication via a shared secret. 
Authentication can be configured to be on via the `spark.authenticate` 
configuration parameter. This parameter controls whether the Spark 
communication protocols do authentication using the shared secret. This 
authentication is a basic handshake to make sure both sides have the same 
shared secret and are allowed to communicate. If the shared secret is not 
identical they will not be allowed to communicate. The shared secret is created 
as follows:
+# Spark RPC
 
-* For Spark on [YARN](running-on-yarn.html) and local deployments, 
configuring `spark.authenticate` to `true` will automatically handle generating 
and distributing the shared secret. Each application will use a unique shared 
secret.
-* For other types of Spark deployments, the Spark parameter 
`spark.authenticate.secret` should be configured on each of the nodes. This 
secret will be used by all the Master/Workers and applications.
+## Authentication
 
-## Web UI
+Spark currently supports authentication for RPC channels using a shared 
secret. Authentication can
+be turned on by setting the `spark.authenticate` configuration parameter.
 
-The Spark UI can be secured by using [javax servlet 
filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the 
`spark.ui.filters` setting
-and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via [SSL 
settings](security.html#ssl-configuration).
+The exact mechanism used to generate and distribute the shared secret is 
deployment-specific.
 
-### Authentication
+For Spark on [YARN](running-on-yarn.html) and local deployments, Spark 
will automatically handle
+generating and distributing the shared secret. Each application will use a 
unique shared secret. In
+the case of YARN, this feature relies on YARN RPC encryption being enabled 
for the distribution of
+secrets to be secure.
 
-A user may want to secure the UI if it has data that other users should 
not be allowed to see. The javax servlet filter specified by the user can 
authenticate the user and then once the user is logged in, Spark can compare 
that user versus the view ACLs to make sure they are authorized to view the UI. 
The configs `spark.acls.enable`, `spark.ui.view.acls` and 
`spark.ui.view.acls.groups` control the behavior of the ACLs. Note that the 
user who started the application always has view access to the UI.  On YARN, 
the Spark UI uses the standard YARN web application proxy mechanism and will 
authenticate via any installed Hadoop filters.
+For other resource managers, `spark.authenticate.secret` must be 
configured on each of the nodes.
+This secret will be shared by all the daemons and applications, so this 
deployment configuration is
+not as secure as the above, especially when considering multi-tenant 
clusters.
 
-Spark also supports modify ACLs to control who has access to modify a 
running Spark application. This includes things like killing the application or 
a task. This is controlled by the configs `spark.acls.enable`, 
`spark.modify.acls` and `spark.modify.acls.groups`. Note that if you are 
authenticating the web UI, in order to use the kill button on the web UI it 
might be necessary to add the users in the modify acls to the view acls also. 
On YARN, the modify acls are passed in and control who has modify access via 
YARN interfaces.
-Spark allows for a set of administrators to be specified in the acls who 
always have view and modify permissions to all the applications. is controlled 
by the configs `spark.admin.acls` and `spark.admin.acls.groups`. This is useful 
on a shared cluster where you might have administrators or support staff who 
help users debug applications.
+
+Property NameDefaultMeaning
+
+  spark.authenticate
+  false
+  Whether Spark authenticates its internal connections.
+
+
+  spark.authenticate.secret
+  None
+  
+The secret key used authentication. See above for when this 
configuration should be set.
+  
+
+
 
-## Event Logging
+## Encryption
 
-If your applications are using event logging, the directory where the 
event logs go (`spark.eventLog.dir`) should be manually created and have the 
proper permissions set on it. If you want those log files secured, the 
permissions should be set to `drwxrwxrwxt` for that directory. The owner of the 
directory should be the super user who is running the history server and the 
group permissions should

[GitHub] spark pull request #20742: [SPARK-23572][docs] Bring "security.md" up to dat...

2018-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20742#discussion_r175173523
  
--- Diff: docs/security.md ---
@@ -182,54 +582,70 @@ configure those ports.
   
 
 
-### HTTP Security Headers
 
-Apache Spark can be configured to include HTTP Headers which aids in 
preventing Cross 
-Site Scripting (XSS), Cross-Frame Scripting (XFS), MIME-Sniffing and also 
enforces HTTP 
-Strict Transport Security.
+# Kerberos
+
+Spark supports submitting applications in environments that use Kerberos 
for authentication.
+In most cases, Spark relies on the credentials of the current logged in 
user when authenticating
+to Kerberos-aware services. Such credentials can be obtained by logging in 
to the configured KDC
+with tools like `kinit`.
+
+When talking to Hadoop-based services, Spark needs to obtain delegation 
tokens so that non-local
+processes can authenticate. Spark ships with support for HDFS and other 
Hadoop file systems, Hive
+and HBase.
+
+When using a Hadoop filesystem (such HDFS or WebHDFS), Spark will acquire 
the relevant tokens
+for the service hosting the user's home directory.
+
+An HBase token will be obtained if HBase is in the application's 
classpath, and the HBase
+configuration has Kerberos authentication turned 
(`hbase.security.authentication=kerberos`).
+
+Similarly, a Hive token will be obtained if Hive is in the classpath, and 
the configuration includes
+a URIs for remote metastore services (`hive.metastore.uris` is not empty).
--- End diff --

nit: either "a URI" or "URIs"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20742: [SPARK-23572][docs] Bring "security.md" up to dat...

2018-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20742#discussion_r175170426
  
--- Diff: docs/security.md ---
@@ -3,47 +3,291 @@ layout: global
 displayTitle: Spark Security
 title: Security
 ---
+* This will become a table of contents (this text will be scraped).
+{:toc}
 
-Spark currently supports authentication via a shared secret. 
Authentication can be configured to be on via the `spark.authenticate` 
configuration parameter. This parameter controls whether the Spark 
communication protocols do authentication using the shared secret. This 
authentication is a basic handshake to make sure both sides have the same 
shared secret and are allowed to communicate. If the shared secret is not 
identical they will not be allowed to communicate. The shared secret is created 
as follows:
+# Spark RPC
 
-* For Spark on [YARN](running-on-yarn.html) and local deployments, 
configuring `spark.authenticate` to `true` will automatically handle generating 
and distributing the shared secret. Each application will use a unique shared 
secret.
-* For other types of Spark deployments, the Spark parameter 
`spark.authenticate.secret` should be configured on each of the nodes. This 
secret will be used by all the Master/Workers and applications.
+## Authentication
 
-## Web UI
+Spark currently supports authentication for RPC channels using a shared 
secret. Authentication can
+be turned on by setting the `spark.authenticate` configuration parameter.
 
-The Spark UI can be secured by using [javax servlet 
filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the 
`spark.ui.filters` setting
-and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via [SSL 
settings](security.html#ssl-configuration).
+The exact mechanism used to generate and distribute the shared secret is 
deployment-specific.
 
-### Authentication
+For Spark on [YARN](running-on-yarn.html) and local deployments, Spark 
will automatically handle
+generating and distributing the shared secret. Each application will use a 
unique shared secret. In
+the case of YARN, this feature relies on YARN RPC encryption being enabled 
for the distribution of
+secrets to be secure.
 
-A user may want to secure the UI if it has data that other users should 
not be allowed to see. The javax servlet filter specified by the user can 
authenticate the user and then once the user is logged in, Spark can compare 
that user versus the view ACLs to make sure they are authorized to view the UI. 
The configs `spark.acls.enable`, `spark.ui.view.acls` and 
`spark.ui.view.acls.groups` control the behavior of the ACLs. Note that the 
user who started the application always has view access to the UI.  On YARN, 
the Spark UI uses the standard YARN web application proxy mechanism and will 
authenticate via any installed Hadoop filters.
+For other resource managers, `spark.authenticate.secret` must be 
configured on each of the nodes.
+This secret will be shared by all the daemons and applications, so this 
deployment configuration is
+not as secure as the above, especially when considering multi-tenant 
clusters.
 
-Spark also supports modify ACLs to control who has access to modify a 
running Spark application. This includes things like killing the application or 
a task. This is controlled by the configs `spark.acls.enable`, 
`spark.modify.acls` and `spark.modify.acls.groups`. Note that if you are 
authenticating the web UI, in order to use the kill button on the web UI it 
might be necessary to add the users in the modify acls to the view acls also. 
On YARN, the modify acls are passed in and control who has modify access via 
YARN interfaces.
-Spark allows for a set of administrators to be specified in the acls who 
always have view and modify permissions to all the applications. is controlled 
by the configs `spark.admin.acls` and `spark.admin.acls.groups`. This is useful 
on a shared cluster where you might have administrators or support staff who 
help users debug applications.
+
+Property NameDefaultMeaning
+
+  spark.authenticate
+  false
+  Whether Spark authenticates its internal connections.
+
+
+  spark.authenticate.secret
+  None
+  
+The secret key used authentication. See above for when this 
configuration should be set.
+  
+
+
 
-## Event Logging
+## Encryption
 
-If your applications are using event logging, the directory where the 
event logs go (`spark.eventLog.dir`) should be manually created and have the 
proper permissions set on it. If you want those log files secured, the 
permissions should be set to `drwxrwxrwxt` for that directory. The owner of the 
directory should be the super user who is running the history server and the 
group permissions should

[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data

2018-03-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19041
  
Thanks @brad-kaiser -- want to re-iterate my comment from Feb 2nd, I think 
that is really the most important part to address before getting into the 
details of the current implementation:

> Thought some more about the race between RemoveBlock getting sent back 
from the executor vs when the CacheRecoveryManager tries to replicate the next 
block -- actually why is there the back-and-forth with the driver for every 
block? Why isn't there just one message from the CacheRecoveryManager to the 
executor, saying "Drain all RDD blocks" and then one message from the executor 
back to the driver when its done?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...

2018-03-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19041#discussion_r175164254
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
---
@@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint(
 blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  private def recoverLatestRDDBlock(
+  execId: String,
+  excludeExecutors: Seq[String],
+  context: RpcCallContext): Unit = {
+logDebug(s"Replicating first cached block on $execId")
+val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get)
+val response: Option[Future[Boolean]] = for {
+  blockManagerId <- blockManagerIdByExecutor.get(execId)
+  info <- blockManagerInfo.get(blockManagerId)
+  blocks = info.cachedBlocks.collect { case r: RDDBlockId => r }
+  // we assume blocks from the latest rdd are most relevant
--- End diff --

right, I'm suggesting that you expand the *comment* to be what I had 
written above, so its a little easier to follow the logic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20685: [SPARK-23524] Big local shuffle blocks should not be che...

2018-03-07 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20685
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20685: [SPARK-23524] Big local shuffle blocks should not be che...

2018-03-07 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20685
  
I agree with @cloud-fan .  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...

2018-03-07 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20657
  
btw I took a look at the code in `MesosHadoopDelegationTokenManager`, there 
seems to be a lot of duplication that could probably be factored out, and I 
wonder if the things that are different really should be the same.  Eg. mesos 
doesn't expose a conf for KERBEROS_RELOGIN, its just using the renewal time 
from the delegation tokens.  Seems pretty easy for that to be wrong.

I can open a separate ticket for that but wanted to see if this makes sense


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172954706
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -93,11 +93,24 @@ private[spark] class Client(
 
   private val distCacheMgr = new ClientDistributedCacheManager()
 
-  private var loginFromKeytab = false
-  private var principal: String = null
-  private var keytab: String = null
-  private var credentials: Credentials = null
-  private var amKeytabFileName: String = null
+  private val principal = sparkConf.get(PRINCIPAL).orNull
+  private val keytab = sparkConf.get(KEYTAB).orNull
+  private val loginFromKeytab = principal != null
+  private val amKeytabFileName: String = {
+require((principal == null) == (keytab == null),
+  "Both principal and keytab must be defined, or neither.")
+if (loginFromKeytab) {
+  logInfo(s"Kerberos credentials: principal = $principal, keytab = 
$keytab")
+  // Generate a file name that can be used for the keytab file, that 
does not conflict
+  // with any user file.
+  new File(keytab).getName() + "-" + UUID.randomUUID().toString
+} else {
+  null
+}
+  }
+
+  // Defensive copy of the credentials
+  private val credentials = new 
Credentials(UserGroupInformation.getCurrentUser.getCredentials)
--- End diff --

this appears to be unused.  did you mean to use this in 
`setupSecurityToken()`?  not really sure what you're defending against with the 
copy, perhaps that should go in the comment as well ... I see it was just in 
the old code though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-07 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172963955
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
--- End diff --

for folks like me less familiar with this, this seems like a good spot to 
explain the overall flow a little bit more.  Eg.

The KDC provides a ticket granting ticket (tgt), which is then used to 
obtain delegation tokens for each service.  The KDC does not expose the tgt's 
expiry time, so renewal is controlled by a conf (by default 1m, much more 
frequent than usual expiry times).  Each providers delegation token provider 
should determine the expiry time of the delegation token, so they can be 
renewed appropriately.

(in particular I needed an extra read to figure out why the tgt had its own 
renewal mechanism)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20685: [SPARK-23524] Big local shuffle blocks should not be che...

2018-03-06 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20685
  
it'll also help with disk corruption ... from the stack traces in 
SPARK-4105 you can't really tell what the source of the problem is.  it'll be 
pretty hard to determine what the source of corruption is if we start seeing it 
again.  anyway, I don't feel that strongly about it either way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172581601
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging {
   private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: 
SparkConf) {
 UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
 val creds = deserialize(tokens)
-logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
+logInfo("Updating delegation tokens for current user.")
--- End diff --

yeah I was thinking it might be handy to have it logged in the executors 
and driver as well, sort of as an RPC id, so you could correlate the log lines, 
in case there was ever a delay in propagation or a failure to get to one 
executor or something, since you're choosing to always log something here.  
Still, your call.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172579936
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -520,4 +520,16 @@ package object config {
   .checkValue(v => v > 0, "The threshold should be positive.")
   .createWithDefault(1000)
 
+  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
+ConfigBuilder("spark.security.credentials.renewalRatio")
+  .doc("Ratio of the credential's expiration time when Spark should 
fetch new credentials.")
+  .doubleConf
+  .createWithDefault(0.75d)
+
+  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
+ConfigBuilder("spark.security.credentials.retryWait")
+  .doc("How long to wait before retrying to fetch new credentials 
after a failure.")
+  .timeConf(TimeUnit.SECONDS)
+  .createWithDefaultString("1h")
--- End diff --

I thought that is what internal meant ... a user *could* specify them, but 
we don't document them at all, so not a stable part of the api etc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-03-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20685#discussion_r172579121
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("big corrupt blocks will not be retiried") {
+val corruptStream = mock(classOf[InputStream])
+when(corruptStream.read(any(), any(), any())).thenThrow(new 
IOException("corrupt"))
+val corruptBuffer = mock(classOf[ManagedBuffer])
+when(corruptBuffer.createInputStream()).thenReturn(corruptStream)
+doReturn(1L).when(corruptBuffer).size()
+
+val blockManager = mock(classOf[BlockManager])
+val localBmId = BlockManagerId("test-client", "test-client", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+
doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0))
+val localBlockLengths = Seq[Tuple2[BlockId, Long]](
+  ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
+)
+
+val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
+  ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
+)
+
+val transfer = mock(classOf[BlockTransferService])
+when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
--- End diff --

sorry my comment was vague -- I *do* think you can use `createMockTransfer` 
here, since that helper method already exists.

I was just thinking that there may be more we could clean up -- setting up 
the local & remote BlockManager Id, creating the ShuffleIterator, etc. seems to 
have a lot of boilerplate in all the tests.  But let's not to do a pure 
refactoring to the other tests in this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20740: [SPARK-23604][SQL] Change Statistics.isEmpty to !Statist...

2018-03-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20740
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172319244
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
+ * elapsed. The new tokens are both added to the current user, and also 
sent to the Spark driver
+ * once it's registered with the AM. The driver is tasked with 
distributing the tokens to other
+ * processes that might need them.
  */
 private[yarn] class AMCredentialRenewer(
 sparkConf: SparkConf,
-hadoopConf: Configuration,
-credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+hadoopConf: Configuration) extends Logging {
 
-  private var lastCredentialsFileSuffix = 0
+  private val principal = sparkConf.get(PRINCIPAL).get
+  private val keytab = sparkConf.get(KEYTAB).get
+  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
 
-  private val credentialRenewerThread: ScheduledExecutorService =
+  private val renewalExecutor: ScheduledExecutorService =
 ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = SparkHadoopUtil.get
+  private val driverRef = new AtomicReference[RpcEndpointRef]()
 
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val daysToKeepFiles = 
sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
-  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
-  private val freshHadoopConf =
-hadoopUtil.getConfBypass

[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172322650
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -520,4 +520,16 @@ package object config {
   .checkValue(v => v > 0, "The threshold should be positive.")
   .createWithDefault(1000)
 
+  private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO =
+ConfigBuilder("spark.security.credentials.renewalRatio")
+  .doc("Ratio of the credential's expiration time when Spark should 
fetch new credentials.")
+  .doubleConf
+  .createWithDefault(0.75d)
+
+  private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT =
+ConfigBuilder("spark.security.credentials.retryWait")
+  .doc("How long to wait before retrying to fetch new credentials 
after a failure.")
+  .timeConf(TimeUnit.SECONDS)
+  .createWithDefaultString("1h")
--- End diff --

`.internal()` for both


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172323592
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security
 
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.ThreadUtils
 
 /**
- * The following methods are primarily meant to make sure long-running 
apps like Spark
- * Streaming apps can run without interruption while accessing secured 
services. The
- * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
- * This method wakes up a thread that logs into the KDC
- * once 75% of the renewal interval of the original credentials used for 
the container
- * has elapsed. It then obtains new credentials and writes them to HDFS in 
a
- * pre-specified location - the prefix of which is specified in the 
sparkConf by
- * spark.yarn.credentials.file (so the file(s) would be named 
c-timestamp1-1, c-timestamp2-2 etc.
- * - each update goes to a new file, with a monotonically increasing 
suffix), also the
- * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
- * After this, the credentials are renewed once 75% of the new tokens 
renewal interval has elapsed.
+ * A manager tasked with periodically updating delegation tokens needed by 
the application.
  *
- * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
- * called once 80% of the validity of the original credentials has 
elapsed. At that time the
- * executor finds the credentials file with the latest timestamp and 
checks if it has read those
- * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
- * appeared, it will read the credentials and update the currently running 
UGI with it. This
- * process happens again once 80% of the validity of this has expired.
+ * This manager is meant to make sure long-running apps (such as Spark 
Streaming apps) can run
+ * without interruption while accessing secured services. It periodically 
logs in to the KDC with
+ * user-provided credentials, and contacts all the configured secure 
services to obtain delegation
+ * tokens to be distributed to the rest of the application.
+ *
+ * New delegation tokens are created once 75% of the renewal interval of 
the original tokens has
+ * elapsed. The new tokens are both added to the current user, and also 
sent to the Spark driver
+ * once it's registered with the AM. The driver is tasked with 
distributing the tokens to other
+ * processes that might need them.
  */
 private[yarn] class AMCredentialRenewer(
 sparkConf: SparkConf,
-hadoopConf: Configuration,
-credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
+hadoopConf: Configuration) extends Logging {
 
-  private var lastCredentialsFileSuffix = 0
+  private val principal = sparkConf.get(PRINCIPAL).get
+  private val keytab = sparkConf.get(KEYTAB).get
+  private val credentialManager = new 
YARNHadoopDelegationTokenManager(sparkConf, hadoopConf)
 
-  private val credentialRenewerThread: ScheduledExecutorService =
+  private val renewalExecutor: ScheduledExecutorService =
 ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
 
-  private val hadoopUtil = SparkHadoopUtil.get
+  private val driverRef = new AtomicReference[RpcEndpointRef]()
 
-  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
-  private val daysToKeepFiles = 
sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
-  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
-  private val freshHadoopConf =
-hadoopUtil.getConfBypass

[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172325576
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging {
   private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: 
SparkConf) {
 UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
 val creds = deserialize(tokens)
-logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
+logInfo("Updating delegation tokens for current user.")
--- End diff --

just a thought -- rather than just serializing the Credentials, would it be 
helpful to serialize a timestamp when the tokens were obtained and when they 
will be refreshed as well, so it could be logged here?
you have spent more time debugging cases with problems so you will probably 
have a better idea if that would be helpful


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20657#discussion_r172321966
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1009,7 +987,7 @@ private[spark] class Client(
   }
 
   def setupCredentials(): Unit = {
-loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
+loginFromKeytab = sparkConf.contains(PRINCIPAL)
--- End diff --

if a user only specifies keytab, but no principal, I don't think this will 
fail in a friendly way.  This will be a no-op, so it'll succeed, and then in 
ApplicationMaster / AMCredentialRenewer, you'll get an error trying to do 
`sparkConf.get(PRINCIPAL).get`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20685#discussion_r172226910
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("big corrupt blocks will not be retiried") {
--- End diff --

typo: retried (or maybe "retired", not sure)
though I think a better name would be "big blocks are not checked for 
corruption"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...

2018-03-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20685#discussion_r172232973
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ---
@@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("big corrupt blocks will not be retiried") {
+val corruptStream = mock(classOf[InputStream])
+when(corruptStream.read(any(), any(), any())).thenThrow(new 
IOException("corrupt"))
+val corruptBuffer = mock(classOf[ManagedBuffer])
+when(corruptBuffer.createInputStream()).thenReturn(corruptStream)
+doReturn(1L).when(corruptBuffer).size()
+
+val blockManager = mock(classOf[BlockManager])
+val localBmId = BlockManagerId("test-client", "test-client", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+
doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0))
+val localBlockLengths = Seq[Tuple2[BlockId, Long]](
+  ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
+)
+
+val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
+val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
+  ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
+)
+
+val transfer = mock(classOf[BlockTransferService])
+when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
--- End diff --

you can reuse `createMockTransfer` to simplify this a little.

(actually, a bunch of this test code looks like it could be refactored 
across these tests -- but we can leave that out of this change.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r170383918
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
   /**
* Request that the cluster manager kill the specified executors.
*
-   * When asking the executor to be replaced, the executor loss is 
considered a failure, and
-   * killed tasks that are running on the executor will count towards the 
failure limits. If no
-   * replacement is being requested, then the tasks will not count towards 
the limit.
-   *
* @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, 
default false
+   * @param adjustTargetNumExecutors whether the target number of 
executors will be adjusted down
+   * after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when 
they are killed, whether
--- End diff --

whoops, I was supposed to set `countFailures = true` in 
`sc.killAndReplaceExecutors`, thanks for catching that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20645: SPARK-23472: Add defaultJavaOptions for drivers and exec...

2018-02-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20645
  
I agree it would be nicer to have this be a more general feature.  I would 
prefer an approach which didn't require a different configuration name, just as 
its more to document & for users to keep track of.  An alternative would be to 
have another syntax for appending to an options, perhaps ++= like scala , eg. 
"--conf spark.executor.extraJavaOptions++=..."

You'd still need to tag the ConfigBuilder with what separator to use to 
merge the values together.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20652: [SPARK-23476][CORE] Generate secret in local mode when a...

2018-02-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20652
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
sure @skonto, great to have somebody more knowledgable on mesos taking a 
closer look at this.

sorry @IgorBerman I promised a quick fix here, but have realized this is 
more complicated than we originally thought.  but the discussion is moving 
forward at least, so thanks for driving it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
cc @attilapiros , you may be interested b/c of how this relates to 
SPARK-16630


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
ok hmm ... so actually this change would lose some important functionality 
then.  unfortunately I don't have a clear picture yet of how to solve 
SPARK-16630 along with the other blacklisting.

sorry this might actually need a more general solution, need to think about 
it a bit more ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
@susanxhuynh good point about changing default behavior.  I'd rather have 
the change so we have more unified behavior between mesos and other cluster 
managers.  But I have never run spark on mesos or even had much interaction 
with users of spark on mesos, so I will defer to others judgement.  Another 
option: we could leave the old behavior, unless a user sets 
spark.blacklist.enabled=true.  its a little wonky, but that also guarantees you 
always get some blacklisting.

I've also been considering turning blacklisting on by default in spark 2.4. 
 So far I've had good feedback from users running it (though we'll get way more 
feedback when its on by default).

btw, one hole in the general blacklisting is handling cases when the 
executors fail to even start: https://issues.apache.org/jira/browse/SPARK-16630

would that be covered by mesos with the old code?  just want to make sure 
we aren't losing that ability.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
thanks @IgorBerman, description looks fine to me now, maybe I saw it wrong 
before.

your test sounds pretty good to me ... you could turn on debug logging for 
MesosCoarseGrainedSchedulerBackend and look for these log msgs:


https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala#L603

What do you mean "it didn't have much effect" -- sounds like it did exactly 
the right thing?

Sorry, I don't really understand description of the other bug you 
mentioned.  Why shouldn't it start a 2nd executor on the same slave for the 
same application?  That seems fine until you have enough failures for the node 
blacklisting to take effect.  There is also a small race (that is relatively 
benign) that would allow you to get a an executor on a node which you are in 
the middle of blacklisting.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20604
  
Jenkins, retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20604
  
known flaky test: https://issues.apache.org/jira/browse/SPARK-23458


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20635: [SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization ...

2018-02-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20635
  
thanks @ivoson , merged!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17619: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/17619
  
for anyone watching this: @IgorBerman submitted an updated version of this 
here https://github.com/apache/spark/pull/20640 which I plan to merge unless 
there are any objections.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
lgtm

@IgorBerman can you cleanup the PR description a little?  headers got 
duplicated.  And I'd reword a bit to something like

> This updates the Mesos scheduler to integrate with the common logic for 
node-blacklisting across all cluster managers in BlacklistTracker.  
Specifically, it removes a hardcoded MAX_SLAVE_FAILURES = 2 in 
MesosCoarseGrainedSchedulerBackend, and uses the blacklist from the 
BlacklistTracker, as Yarn does.

> This closes https://github.com/apache/spark/pull/17619

(the last "this closes" bit is useful for some tools we have, it will close 
the other one when this is merged.)

thanks for doing this.  I will probably leave this open for a bit if more 
mesos users have thoughts


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17619: [SPARK-19755][Mesos] Blacklist is always active f...

2018-02-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17619#discussion_r169458952
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -484,7 +481,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   cpus + totalCoresAcquired <= maxCores &&
   mem <= offerMem &&
   numExecutors() < executorLimit &&
-  slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < 
MAX_SLAVE_FAILURES &&
--- End diff --

thanks @skonto , also I realized there was more explicit calls to 
declineOffer than I thought initially after a closer read of the code.  btw 
@IgorBerman has opened an updated version of this PR here 
https://github.com/apache/spark/pull/20640 -- would appreciate a review over 
there


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r169438419
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -455,7 +461,12 @@ private[spark] class ExecutorAllocationManager(
 val executorsRemoved = if (testing) {
   executorIdsToBeRemoved
 } else {
-  client.killExecutors(executorIdsToBeRemoved)
+  // We don't want to change our target number of executors, because 
we already did that
+  // when the task backlog decreased.  Normally there wouldn't be any 
tasks running on these
+  // executors, but maybe the scheduler *just* decided to run a task 
there -- in that case,
--- End diff --

good point,  I didn't look closely enough at the semantics of `force`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r169437530
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends 
Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
 schedulerBackend match {
   case b: ExecutorAllocationClient =>
-b.killExecutors(executorIds, replace = false, force = 
true).nonEmpty
+require(executorAllocationManager.isEmpty,
--- End diff --

What would calling this mean with dynamic allocation on?  Note this api 
explicitly says its meant to adjust resource usage downwards.  If you've got 
just one executor, and then you kill it, should your app sit with 0 executors?  
Or even if you've got 10 executors, and you kill one -- when is dynamic 
allocation allowed to bump the total back up?  I can't think of useful clear 
semantics for this

(though this is not necessary to fix the bug, I could pull this out and 
move to a discussion in a new jira)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r169436456
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -334,6 +336,10 @@ private[spark] class ExecutorAllocationManager(
 
   // If the new target has not changed, avoid sending a message to the 
cluster manager
   if (numExecutorsTarget < oldNumExecutorsTarget) {
+// We lower the target number of executors but don't actively kill 
any yet.  We do this
--- End diff --

I was trying to answer a different question -- if we don't kill the 
executor now, why even bother lowering the target number?  as that would be an 
alternative solution -- don't adjust the target number here at all, just wait 
until you kill the executors for being idle.  (and really I'm just guessing at 
the logic.)

lemme try to reword this some ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...

2018-02-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20640#discussion_r169413261
  
--- Diff: 
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 ---
@@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 verifyTaskLaunched(driver, "o2")
   }
 
+  test("mesos declines offers from blacklisted slave") {
+setBackend()
+
+// launches a task on a valid offer on slave s0
+val minMem = backend.executorMemory(sc) + 1024
+val minCpu = 4
+val offer1 = Resources(minMem, minCpu)
+offerResources(List(offer1))
+verifyTaskLaunched(driver, "o1")
+
+// for any reason executor(aka mesos task) failed on s0
+val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
+backend.statusUpdate(driver, status)
+when(taskScheduler.nodeBlacklist()).thenReturn(Set("s1"))
+
+val offer2 = Resources(minMem, minCpu)
+// Launches a new task on a valid offer from the same slave
+offerResources(List(offer2))
+// but since it's blacklisted the offer is declined
+verifyDeclinedOffer(driver, createOfferId("o1"))
--- End diff --

ah nevermind.  I took another look at the code and now I see how this works


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
thanks for updating.  can you also update the PR description?

yeah its fine to just update this one.  You can't in general update others' 
prs, unless they give you push permissions to their repos.  You *can* start 
from their branch, and then add your changes on top -- that is a little 
preferable as that way the commit history includes their work, so when someone 
merges its a bit more obvious.  If you can adjust this PR that way, that would 
be nice -- but otherwise its OK, I will try to remember to adjust attribution 
when merging


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...

2018-02-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20640#discussion_r169391079
  
--- Diff: 
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 ---
@@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 verifyTaskLaunched(driver, "o2")
   }
 
+  test("mesos declines offers from blacklisted slave") {
+setBackend()
+
+// launches a task on a valid offer on slave s0
+val minMem = backend.executorMemory(sc) + 1024
+val minCpu = 4
+val offer1 = Resources(minMem, minCpu)
+offerResources(List(offer1))
+verifyTaskLaunched(driver, "o1")
+
+// for any reason executor(aka mesos task) failed on s0
+val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED)
+backend.statusUpdate(driver, status)
+when(taskScheduler.nodeBlacklist()).thenReturn(Set("s1"))
+
+val offer2 = Resources(minMem, minCpu)
+// Launches a new task on a valid offer from the same slave
+offerResources(List(offer2))
+// but since it's blacklisted the offer is declined
+verifyDeclinedOffer(driver, createOfferId("o1"))
--- End diff --

will this actually pass?  I thought it wouldn't b/c the filtering is done 
inside `buildMesosTasks`, which never calls `declineOffer` on offers that fail 
`canLaunchTask`.  (a separate thing which needs fixing -- you could open 
another issue for that.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
I understand if you want to do something like this for yourself to unblock, 
but I think I'm -1 on merging this because of adding more configs just for a 
stopgap.

but I think we agree on the right solution here -- if you post that I will 
try to review promptly since I've got this paged in (though it might take a bit 
to merge as we figure out how to test ...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
@IgorBerman I actually think that 
https://github.com/apache/spark/pull/17619 is the right approach.  As @timout 
pointed out on that one, this functionality doesn't need to be covered in mesos 
specific code at all, as its covered by the BlacklistTracker.  I don't like 
introducing new configs when we don't really need them.  Other than this being 
less invasive, is there another advantage here?

The modifications I suggested to that PR are relatively small -- I think 
its fine if you want to open a PR that is the original updated with my 
suggestions (credit still to @timout), as I'm not sure if they're still working 
on it.  (my fault too as there was such a long delay for a proper review.)

While I had some open questions, I think its a clear improvement in any 
case.  I just need to get a little help on mesos testing, we can ask on the dev 
list.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...

2018-02-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20640
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20635: [SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization ...

2018-02-18 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20635
  
Jenkins, Ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20635: [SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization ...

2018-02-18 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20635
  
lgtm assuming tests pass


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20604
  
@tgravescs @vanzin @zsxwing could you take a look? thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20424: [Spark-23240][python] Better error message when extraneo...

2018-02-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20424
  
still lgtm, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20623: [SPARK-23413][UI] Fix sorting tasks by Host / Exe...

2018-02-15 Thread squito
Github user squito closed the pull request at:

https://github.com/apache/spark/pull/20623


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20601: [SPARK-23413][UI] Fix sorting tasks by Host / Executor I...

2018-02-15 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20601
  
ack I merged to master but screwed up on 2.3 -- fixing that here: 
https://github.com/apache/spark/pull/20623


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20623: [SPARK-23413][UI] Fix sorting tasks by Host / Exe...

2018-02-15 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/20623

[SPARK-23413][UI] Fix sorting tasks by Host / Executor ID at the Stag…

…e page

## What changes were proposed in this pull request?

Fixing exception got at sorting tasks by Host / Executor ID:
```
java.lang.IllegalArgumentException: Invalid sort column: Host
at org.apache.spark.ui.jobs.ApiHelper$.indexName(StagePage.scala:1017)
at 
org.apache.spark.ui.jobs.TaskDataSource.sliceData(StagePage.scala:694)
at org.apache.spark.ui.PagedDataSource.pageData(PagedTable.scala:61)
at org.apache.spark.ui.PagedTable$class.table(PagedTable.scala:96)
at org.apache.spark.ui.jobs.TaskPagedTable.table(StagePage.scala:708)
at org.apache.spark.ui.jobs.StagePage.liftedTree1$1(StagePage.scala:293)
at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:282)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at 
org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at 
org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
```

Moreover some refactoring to avoid similar problems by introducing 
constants for each header name and reusing them at the identification of the 
corresponding sorting index.

## How was this patch tested?

Manually:

![screen shot 2018-02-13 at 18 57 
10](https://user-images.githubusercontent.com/2017933/36166532-1cfdf3b8-10f3-11e8-8d32-5fcaad2af214.png)

Author: “attilapiros” <piros.attila.zs...@gmail.com>

Closes #20601 from attilapiros/SPARK-23413.

(cherry picked from commit 1dc2c1d5e85c5f404f470aeb44c1f3c22786bdea)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark fix_backport

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20623.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20623


commit f7a22827694a3aa92e8a7dd20195e2895e86880a
Author: “attilapiros” <piros.attila.zsolt@...>
Date:   2018-02-15T19:51:24Z

[SPARK-23413][UI] Fix sorting tasks by Host / Executor ID at the Stage page

## What changes were proposed in this pull request?

Fixing exception got at sorting tasks by Host / Executor ID:
```
java.lang.IllegalArgumentException: Invalid sort column: Host
at org.apache.spark.ui.jobs.ApiHelper$.indexName(StagePage.scala:1017)
at 
org.apache.spark.ui.jobs.TaskDataSource.sliceData(StagePage.scala:694)
at org.apache.spark.ui.PagedDataSource.pageData(PagedTable.scala:61)
at org.apache.spark.ui.PagedTable$class.table(PagedTable.scala:96)
at org.apache.spark.ui.jobs.TaskPagedTable.table(StagePage.scala:708)
at org.apache.spark.ui.jobs.StagePage.liftedTree1$1(StagePage.scala:293)
at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:282)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at 
org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at 
org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
```

Moreover some refactoring to avoid similar problems by introducing 
constants for each header name and reusing them at the identification of the 
corresponding sorting index.

## How was this patch tested?

Manually:

![screen shot 2018-02-13 at 18 57 
10](https://user-images.githubusercontent.com/2017933/36166532-1cfdf3b8-10f3-11e8-8d32-5fcaad2af214.png)

Author: “attilapiros” <piros.attila.zs...@gmail.com>

Closes #20601 from attilapiros/SPARK-23413.

(cherry picked from commit 1dc2c1d5e85c5f404f470aeb44c1f3c22786bdea)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20601: [SPARK-23413][UI] Fix sorting tasks by Host / Executor I...

2018-02-15 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20601
  
Everything that might have changed from this has passed, the failures are 
known flaky tests:

https://issues.apache.org/jira/browse/SPARK-23369

https://issues.apache.org/jira/browse/SPARK-23390

merging to master / 2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17619: [SPARK-19755][Mesos] Blacklist is always active f...

2018-02-15 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17619#discussion_r168509141
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -484,7 +481,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   cpus + totalCoresAcquired <= maxCores &&
   mem <= offerMem &&
   numExecutors() < executorLimit &&
-  slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < 
MAX_SLAVE_FAILURES &&
--- End diff --

rather than just deleting this, we should replace it with a check to 
`scheduler.nodeBlacklist()`, like the YarnScheduler is doing here: 
https://github.com/apache/spark/blob/44e20c42254bc6591b594f54cd94ced5fcfadae3/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17208: [SPARK-19868] conflict TasksetManager lead to spark stop...

2018-02-14 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/17208
  
hmm I think you're right @zsxwing that we should be updating `isZombie` 
before `sched.dagScheduler.taskEnded` and `sched.dagScheduler.taskSetFailed` is 
called, just to keep state consistent.  I don't think you'll actually hit the 
bug described here, as (a) if it was from a fetch failure, `isZombie` is 
already set first or if (b) its just a regular task failure, and it leads to 
the stage getting aborted, then there aren't any more retries of the stage 
anyway.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20601: [SPARK-23413][UI] Fix sorting tasks by Host / Exe...

2018-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20601#discussion_r168313710
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---
@@ -963,33 +965,60 @@ private[ui] class TaskPagedTable(
 
 private object ApiHelper {
 
+  val HEADER_ID = "ID"
+  val HEADER_TASK_INDEX = "Index"
+  val HEADER_ATTEMPT = "Attempt"
+  val HEADER_STATUS = "Status"
+  val HEADER_LOCALITY = "Locality Level"
+  val HEADER_EXECUTOR = "Executor ID"
+  val HEADER_HOST = "Host"
+  val HEADER_LAUNCH_TIME = "Launch Time"
+  val HEADER_DURATION = "Duration"
+  val HEADER_SCHEDULER_DELAY = "Scheduler Delay"
+  val HEADER_DESER_TIME = "Task Deserialization Time"
+  val HEADER_GC_TIME = "GC Time"
+  val HEADER_SER_TIME = "Result Serialization Time"
+  val HEADER_GETTING_RESULT_TIME = "Getting Result Time"
+  val HEADER_PEAK_MEM = "Peak Execution Memory"
+  val HEADER_ACCUMULATORS = "Accumulators"
+  val HEADER_INPUT_SIZE = "Input Size / Records"
+  val HEADER_OUTPUT_SIZE = "Output Size / Records"
+  val HEADER_SHUFFLE_READ_TIME = "Shuffle Read Blocked Time"
+  val HEADER_SHUFFLE_TOTAL_READS = "Shuffle Read Size / Records"
+  val HEADER_SHUFFLE_REMOTE_READS = "Shuffle Remote Reads"
+  val HEADER_SHUFFLE_WRITE_TIME = "Write Time"
+  val HEADER_SHUFFLE_WRITE_SIZE = "Shuffle Write Size / Records"
+  val HEADER_MEM_SPILL = "Shuffle Spill (Memory)"
+  val HEADER_DISK_SPILL = "Shuffle Spill (Disk)"
+  val HEADER_ERROR = "Errors"
 
   private val COLUMN_TO_INDEX = Map(
-"ID" -> null.asInstanceOf[String],
-"Index" -> TaskIndexNames.TASK_INDEX,
-"Attempt" -> TaskIndexNames.ATTEMPT,
-"Status" -> TaskIndexNames.STATUS,
-"Locality Level" -> TaskIndexNames.LOCALITY,
-"Executor ID / Host" -> TaskIndexNames.EXECUTOR,
-"Launch Time" -> TaskIndexNames.LAUNCH_TIME,
-"Duration" -> TaskIndexNames.DURATION,
-"Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY,
-"Task Deserialization Time" -> TaskIndexNames.DESER_TIME,
-"GC Time" -> TaskIndexNames.GC_TIME,
-"Result Serialization Time" -> TaskIndexNames.SER_TIME,
-"Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME,
-"Peak Execution Memory" -> TaskIndexNames.PEAK_MEM,
-"Accumulators" -> TaskIndexNames.ACCUMULATORS,
-"Input Size / Records" -> TaskIndexNames.INPUT_SIZE,
-"Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE,
-"Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME,
-"Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS,
-"Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS,
-"Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME,
-"Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
-"Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL,
-"Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL,
-"Errors" -> TaskIndexNames.ERROR)
+HEADER_ID -> null.asInstanceOf[String],
+HEADER_TASK_INDEX -> TaskIndexNames.TASK_INDEX,
+HEADER_ATTEMPT -> TaskIndexNames.ATTEMPT,
+HEADER_STATUS -> TaskIndexNames.STATUS,
+HEADER_LOCALITY -> TaskIndexNames.LOCALITY,
+HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR,
+HEADER_HOST -> TaskIndexNames.EXECUTOR,
--- End diff --

or even go back to the 2.2 behavior, with executor & host in the same 
column.

I do think having a separate column for host, and having it be sortable, is 
actually better ... but just trying to think of simple solutions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20601: [SPARK-23413][UI] Fix sorting tasks by Host / Exe...

2018-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20601#discussion_r168211371
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---
@@ -963,33 +965,60 @@ private[ui] class TaskPagedTable(
 
 private object ApiHelper {
 
+  val HEADER_ID = "ID"
+  val HEADER_TASK_INDEX = "Index"
+  val HEADER_ATTEMPT = "Attempt"
+  val HEADER_STATUS = "Status"
+  val HEADER_LOCALITY = "Locality Level"
+  val HEADER_EXECUTOR = "Executor ID"
+  val HEADER_HOST = "Host"
+  val HEADER_LAUNCH_TIME = "Launch Time"
+  val HEADER_DURATION = "Duration"
+  val HEADER_SCHEDULER_DELAY = "Scheduler Delay"
+  val HEADER_DESER_TIME = "Task Deserialization Time"
+  val HEADER_GC_TIME = "GC Time"
+  val HEADER_SER_TIME = "Result Serialization Time"
+  val HEADER_GETTING_RESULT_TIME = "Getting Result Time"
+  val HEADER_PEAK_MEM = "Peak Execution Memory"
+  val HEADER_ACCUMULATORS = "Accumulators"
+  val HEADER_INPUT_SIZE = "Input Size / Records"
+  val HEADER_OUTPUT_SIZE = "Output Size / Records"
+  val HEADER_SHUFFLE_READ_TIME = "Shuffle Read Blocked Time"
+  val HEADER_SHUFFLE_TOTAL_READS = "Shuffle Read Size / Records"
+  val HEADER_SHUFFLE_REMOTE_READS = "Shuffle Remote Reads"
+  val HEADER_SHUFFLE_WRITE_TIME = "Write Time"
+  val HEADER_SHUFFLE_WRITE_SIZE = "Shuffle Write Size / Records"
+  val HEADER_MEM_SPILL = "Shuffle Spill (Memory)"
+  val HEADER_DISK_SPILL = "Shuffle Spill (Disk)"
+  val HEADER_ERROR = "Errors"
 
   private val COLUMN_TO_INDEX = Map(
-"ID" -> null.asInstanceOf[String],
-"Index" -> TaskIndexNames.TASK_INDEX,
-"Attempt" -> TaskIndexNames.ATTEMPT,
-"Status" -> TaskIndexNames.STATUS,
-"Locality Level" -> TaskIndexNames.LOCALITY,
-"Executor ID / Host" -> TaskIndexNames.EXECUTOR,
-"Launch Time" -> TaskIndexNames.LAUNCH_TIME,
-"Duration" -> TaskIndexNames.DURATION,
-"Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY,
-"Task Deserialization Time" -> TaskIndexNames.DESER_TIME,
-"GC Time" -> TaskIndexNames.GC_TIME,
-"Result Serialization Time" -> TaskIndexNames.SER_TIME,
-"Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME,
-"Peak Execution Memory" -> TaskIndexNames.PEAK_MEM,
-"Accumulators" -> TaskIndexNames.ACCUMULATORS,
-"Input Size / Records" -> TaskIndexNames.INPUT_SIZE,
-"Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE,
-"Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME,
-"Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS,
-"Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS,
-"Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME,
-"Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
-"Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL,
-"Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL,
-"Errors" -> TaskIndexNames.ERROR)
+HEADER_ID -> null.asInstanceOf[String],
+HEADER_TASK_INDEX -> TaskIndexNames.TASK_INDEX,
+HEADER_ATTEMPT -> TaskIndexNames.ATTEMPT,
+HEADER_STATUS -> TaskIndexNames.STATUS,
+HEADER_LOCALITY -> TaskIndexNames.LOCALITY,
+HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR,
+HEADER_HOST -> TaskIndexNames.EXECUTOR,
--- End diff --

another alternative is to disable sorting by host, and just fix sorting by 
executor.  That could go into 2.3.1 without breaking compatibility.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20596: [SPARK-23404][CORE]When the underlying buffers are direc...

2018-02-13 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20596
  
Have you seen a case where you actually have an off-heap buffer passed in, 
though the desire storage is on-heap?  Eg. if its comes from the block transfer 
service than I think it will always be on-heap:


https://github.com/apache/spark/blob/2ee76c22b6e48e643694c9475e5f0d37124215e7/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala#L108


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20474: [SPARK-23235][Core] Add executor Threaddump to api

2018-02-13 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20474
  
merged to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [WIP][SPARK-23365][CORE] Do not adjust num execut...

2018-02-13 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/20604

[WIP][SPARK-23365][CORE] Do not adjust num executors when killing idle 
executors.

The ExecutorAllocationManager should not adjust the target number of
executors when killing idle executors, as it has already adjusted the
target number down based on the task backlog.

The change is more than just flipping the value of `replace` because
`replace` also implied failure handling.  Furthermore, the name
`replace` was misleading with DynamicAllocation on, as the target number
of executors is changed outside of the call to `killExecutors`.

TODO:
testing

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-23365

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20604.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20604


commit b5a39dab324be4bb358682720cf0f7e55272559d
Author: Imran Rashid <irashid@...>
Date:   2018-02-13T22:07:26Z

[SPARK-23365] Do not adjust num executors when killing idle executors.

The ExecutorAllocationManager should not adjust the target number of
executors when killing idle executors, as it has already adjusted the
target number down based on the task backlog.

The change is more than just flipping the value of `replace` because
`replace` also implied failure handling.  Furthermore, the name
`replace` was misleading with DynamicAllocation on, as the target number
of executors is changed outside of the call to `killExecutors`.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20532: [SPARK-23353][CORE] Allow ExecutorMetricsUpdate events t...

2018-02-13 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20532
  
I agree with @jiangxb1987 ... we already have issues with event logs being 
too big, as it the driver gets backlogged even writing them out, and then the 
history server takes a long time to parse those files.  There have been recent 
improvements to that, but doesn't mean we should reintroduce the problem.

I'm not saying this doesn't have a use, I'd just like to figure out if this 
the best way to do it.  If it only has one very specific use case for 
@LantaoJin , then maybe they have an alternative still using public apis, with 
a custom listener as I suggested.  I worry a user might turn this on (why not, 
more data is better) and then later on hit other scaling challenges and not 
realize this was the problem.

Or if this does have some general use case for all users, then maybe its 
fine, but I haven't seen that yet.  And maybe there is a better way to do that 
... do we need another way to get detailed output metrics from executors, that 
doesn't have some of the scaling challenges of the event log? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20601: [SPARK-23413][UI] Fix sorting tasks by Host / Exe...

2018-02-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20601#discussion_r167991914
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---
@@ -963,33 +965,60 @@ private[ui] class TaskPagedTable(
 
 private object ApiHelper {
 
+  val HEADER_ID = "ID"
+  val HEADER_TASK_INDEX = "Index"
+  val HEADER_ATTEMPT = "Attempt"
+  val HEADER_STATUS = "Status"
+  val HEADER_LOCALITY = "Locality Level"
+  val HEADER_EXECUTOR = "Executor ID"
+  val HEADER_HOST = "Host"
+  val HEADER_LAUNCH_TIME = "Launch Time"
+  val HEADER_DURATION = "Duration"
+  val HEADER_SCHEDULER_DELAY = "Scheduler Delay"
+  val HEADER_DESER_TIME = "Task Deserialization Time"
+  val HEADER_GC_TIME = "GC Time"
+  val HEADER_SER_TIME = "Result Serialization Time"
+  val HEADER_GETTING_RESULT_TIME = "Getting Result Time"
+  val HEADER_PEAK_MEM = "Peak Execution Memory"
+  val HEADER_ACCUMULATORS = "Accumulators"
+  val HEADER_INPUT_SIZE = "Input Size / Records"
+  val HEADER_OUTPUT_SIZE = "Output Size / Records"
+  val HEADER_SHUFFLE_READ_TIME = "Shuffle Read Blocked Time"
+  val HEADER_SHUFFLE_TOTAL_READS = "Shuffle Read Size / Records"
+  val HEADER_SHUFFLE_REMOTE_READS = "Shuffle Remote Reads"
+  val HEADER_SHUFFLE_WRITE_TIME = "Write Time"
+  val HEADER_SHUFFLE_WRITE_SIZE = "Shuffle Write Size / Records"
+  val HEADER_MEM_SPILL = "Shuffle Spill (Memory)"
+  val HEADER_DISK_SPILL = "Shuffle Spill (Disk)"
+  val HEADER_ERROR = "Errors"
 
   private val COLUMN_TO_INDEX = Map(
-"ID" -> null.asInstanceOf[String],
-"Index" -> TaskIndexNames.TASK_INDEX,
-"Attempt" -> TaskIndexNames.ATTEMPT,
-"Status" -> TaskIndexNames.STATUS,
-"Locality Level" -> TaskIndexNames.LOCALITY,
-"Executor ID / Host" -> TaskIndexNames.EXECUTOR,
-"Launch Time" -> TaskIndexNames.LAUNCH_TIME,
-"Duration" -> TaskIndexNames.DURATION,
-"Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY,
-"Task Deserialization Time" -> TaskIndexNames.DESER_TIME,
-"GC Time" -> TaskIndexNames.GC_TIME,
-"Result Serialization Time" -> TaskIndexNames.SER_TIME,
-"Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME,
-"Peak Execution Memory" -> TaskIndexNames.PEAK_MEM,
-"Accumulators" -> TaskIndexNames.ACCUMULATORS,
-"Input Size / Records" -> TaskIndexNames.INPUT_SIZE,
-"Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE,
-"Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME,
-"Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS,
-"Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS,
-"Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME,
-"Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
-"Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL,
-"Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL,
-"Errors" -> TaskIndexNames.ERROR)
+HEADER_ID -> null.asInstanceOf[String],
+HEADER_TASK_INDEX -> TaskIndexNames.TASK_INDEX,
+HEADER_ATTEMPT -> TaskIndexNames.ATTEMPT,
+HEADER_STATUS -> TaskIndexNames.STATUS,
+HEADER_LOCALITY -> TaskIndexNames.LOCALITY,
+HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR,
+HEADER_HOST -> TaskIndexNames.EXECUTOR,
--- End diff --

sorting by host and executor is not the same ... you might have executors 1 
& 5 on host A, and execs 2,3,4 on host B. 

The 2.2 UI had both executor and host in the same column: 
https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L1203

I think we either need to go back to having one column for both, or add an 
index on host.

thoughts @vanzin ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20424: [Spark-23240][python] Better error message when extraneo...

2018-02-13 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20424
  
ah got it.  sounds good to me, I will defer to @HyukjinKwon 's judgement.  
I think this change looks fine


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20474: [SPARK-23235][Core] Add executor Threaddump to api

2018-02-13 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20474
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2018-02-13 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19788
  
Jenkins, retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20424: [Spark-23240][python] Better error message when extraneo...

2018-02-13 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20424
  
lgtm
@bersprockets you mentioned wanting to try the other route as well -- whats 
the status on that?  shoudl we still wait on this one?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20474: [SPARK-23235][Core] Add executor Threaddump to ap...

2018-02-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20474#discussion_r167917613
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala 
---
@@ -51,6 +51,29 @@ private[v1] class AbstractApplicationResource extends 
BaseAppResource {
   @Path("executors")
   def executorList(): Seq[ExecutorSummary] = 
withUI(_.store.executorList(true))
 
+  @GET
+  @Path("executors/{executorId}/threads")
+  def threadDump(@PathParam("executorId") execId: String): 
Array[ThreadStackTrace] = withUI { ui =>
+if (execId != SparkContext.DRIVER_IDENTIFIER && 
!execId.forall(Character.isDigit)) {
+  throw new BadParameterException(
+s"Invalid executorId: neither '${SparkContext.DRIVER_IDENTIFIER}' 
nor number.")
+}
+
+val safeSparkContext = ui.sc.getOrElse {
+  throw new ServiceUnavailable("Thread dumps not available through the 
history server.")
+}
+
+ui.store.asOption(ui.store.executorSummary(execId)) match {
+  case Some(executorSummary) if executorSummary.isActive =>
+  val safeThreadDump = 
safeSparkContext.getExecutorThreadDump(execId).getOrElse {
+throw new NotFoundException("No thread dump is available.")
+  }
+  return safeThreadDump
--- End diff --

you don't need `return` here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20474: [SPARK-23235][Core] Add executor Threaddump to ap...

2018-02-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20474#discussion_r167917733
  
--- Diff: docs/monitoring.md ---
@@ -347,6 +347,10 @@ can be identified by their `[attempt-id]`. In the API 
listed below, when running
 /applications/[app-id]/executors
 A list of all active executors for the given application.
   
+  
+
/applications/[app-id]/executors/[executor-id]/threads
+Stack traces of all the threads running within the given active 
executor.
--- End diff --

Add that this is not available via the history server.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...

2018-02-13 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20244
  
merged to master / 2.3 / 2.2

I hit a merge conflict trying to merge to 2.1 -- feel free to open another 
PR for that version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20408: [SPARK-23189][Core][Web UI] Reflect stage level blacklis...

2018-02-13 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20408
  
merged to master, thanks everyone


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20532: [SPARK-23353][CORE] Allow ExecutorMetricsUpdate events t...

2018-02-08 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20532
  
I can see why you want this sometimes, but I'm trying to figure out if its 
really valuable for users in general.  You could always add a custom listener 
to log this info.  It would go into separate file, not the std event log file, 
which means you'd have a little more work to do to stitch them together.  OTOH 
that could be a good thing, as it means these history server wouldn't have to 
parse those extra lines.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r167138603
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simulate the scene in concurrent jobs using the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
--- End diff --

hi @ivoson -- I haven't come up with a better way to test this, so I think 
for now you should

(1) change the PR to *only* include the changes to the DAGScheduler (also 
undo the `protected[spark]` changes elsewhere)
(2) put this repro on the jira as its a pretty good for showing whats going 
on.

if we come up with a way to test it, we can always do that later on.

thanks and sorry for the back and forth


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20408: [SPARK-23189][Core][Web UI] Reflect stage level blacklis...

2018-02-08 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20408
  
just a quick note -- I realized I was confused about one part of the inner 
workings of the history server which I want to confirm before I merge this, but 
got sick and now have a bit of a backlog.  Will get back to this next week.  
Sorry for the delay


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166458357
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simulate the scene in concurrent jobs using the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
--- End diff --

hi @ivoson  -- I'm really sorry but I only just realized that this "test" 
is really just a repro, and it passes both before and after the actual code 
changes, since you've replicated the internal logic we're fixing.  As such, I 
don't think its actually useful as a test case -- perhaps it should get added 
to the jira as a repro.

I appreciate the work that went into writing this as it helped make the 
issue clear to me.  I am not sure if there is a good way to test this.  If we 
can't come up with anything, we should just commit your actual fix, but give me 
a day or two to think about it ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20451: [SPARK-23146][WIP] Support client mode for Kubern...

2018-02-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20451#discussion_r166087229
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+private[spark] object OptionRequirements {
+
+  def requireBothOrNeitherDefined(
--- End diff --

not used, and if it were, perhaps would be clearer as

```scala
(opt1, opt2) match {
  case (Some(val1), None) =>
...
  case (None, Some(val2)) =>
...
  case _ => // ok
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20493: [SPARK-23326][WEBUI]schedulerDelay should return ...

2018-02-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20493#discussion_r166068708
  
--- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.Date
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.status.api.v1.{TaskData, TaskMetrics}
+
+class AppStatusUtilsSuite extends SparkFunSuite {
+
+  test("schedulerDelay") {
+val runningTask = new TaskData(
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17422: [SPARK-20087] Attach accumulators / metrics to 'T...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17422#discussion_r165772194
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -429,15 +429,42 @@ private[spark] class Executor(
 
 case t: TaskKilledException =>
   logInfo(s"Executor killed $taskName (TID $taskId), reason: 
${t.reason}")
+
+  // Collect latest accumulator values to report back to the driver
+  val accums: Seq[AccumulatorV2[_, _]] =
+if (task != null) {
+  task.metrics.setExecutorRunTime(System.currentTimeMillis() - 
taskStart)
+  task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
+  task.collectAccumulatorUpdates(taskFailed = true)
+} else {
+  Seq.empty
+}
+  val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), 
None))
--- End diff --

this should be refactored, and not repeated 3 times.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17422: [SPARK-20087] Attach accumulators / metrics to 'T...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17422#discussion_r165772055
  
--- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala ---
@@ -212,9 +212,19 @@ case object TaskResultLost extends TaskFailedReason {
  * Task was killed intentionally and needs to be rescheduled.
  */
 @DeveloperApi
-case class TaskKilled(reason: String) extends TaskFailedReason {
-  override def toErrorString: String = s"TaskKilled ($reason)"
+case class TaskKilled(
+reason: String,
+accumUpdates: Seq[AccumulableInfo] = Seq.empty,
+private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil)
+  extends TaskFailedReason {
+
+  override def toErrorString: String = "TaskKilled ($reason)"
   override def countTowardsTaskFailures: Boolean = false
+
+  private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): 
TaskKilled = {
--- End diff --

I don't think this method is really necessary at all, you could just pass 
it in the constructor in the places its used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17422: [SPARK-20087] Attach accumulators / metrics to 'TaskKill...

2018-02-02 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/17422
  
@advancedxy this has been quiet for a long time, so I suggest you just take 
it over.  I actually think this is so close to complete that very little would 
need to be done, and credit would most likely go to @noodle-fb .  That said, we 
may need a little more input on whether or not this is desirable, as it will 
change the meaning of the aggregated metrics.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    3   4   5   6   7   8   9   10   11   12   >