[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r179174442 --- Diff: core/src/test/scala/org/apache/spark/scheduler/CacheRecoveryIntegrationSuite.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.util.Try + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers} +import org.scalatest.concurrent.Eventually +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite, TestUtils} +import org.apache.spark.internal.config._ +import org.apache.spark.network.TransportContext +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler +import org.apache.spark.rdd.RDD +import org.apache.spark.storage._ + +/** + * This is an integration test for the cache recovery feature using a local spark cluster. It + * extends the unit tests in CacheRecoveryManagerSuite which mocks a lot of cluster infrastructure. + */ +class CacheRecoveryIntegrationSuite extends SparkFunSuite +with Matchers +with BeforeAndAfterEach +with BeforeAndAfterAll +with Eventually { + + private var conf: SparkConf = makeBaseConf() + private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 4) + private val rpcHandler = new ExternalShuffleBlockHandler(transportConf, null) + private val transportContext = new TransportContext(transportConf, rpcHandler) + private val shuffleService = transportContext.createServer() + private var sc: SparkContext = _ + + private def makeBaseConf() = new SparkConf() +.setAppName("test") +.setMaster("local-cluster[4, 1, 512]") +.set("spark.dynamicAllocation.enabled", "true") +.set("spark.dynamicAllocation.executorIdleTimeout", "1s") // always +.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "1s") +.set(EXECUTOR_MEMORY.key, "512m") +.set(SHUFFLE_SERVICE_ENABLED.key, "true") +.set(DYN_ALLOCATION_CACHE_RECOVERY.key, "true") +.set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "500s") +.set(EXECUTOR_INSTANCES.key, "1") +.set(DYN_ALLOCATION_INITIAL_EXECUTORS.key, "4") +.set(DYN_ALLOCATION_MIN_EXECUTORS.key, "3") + + override def beforeEach(): Unit = { +conf = makeBaseConf() +conf.set("spark.shuffle.service.port", shuffleService.getPort.toString) + } + + override def afterEach(): Unit = { +sc.stop() +conf = null + } + + override def afterAll(): Unit = { +shuffleService.close() + } + + private def getLocations( + sc: SparkContext, + rdd: RDD[_]): Map[BlockId, Map[BlockManagerId, BlockStatus]] = { +import scala.collection.breakOut +val blockIds: Array[BlockId] = rdd.partitions.map(p => RDDBlockId(rdd.id, p.index)) +blockIds.map { id => + id -> Try(sc.env.blockManager.master.getBlockStatus(id)).getOrElse(Map.empty) +}(breakOut) + } + + test("cached data is replicated before dynamic de-allocation") { +sc = new SparkContext(conf) +TestUtils.waitUntilExecutorsUp(sc, 4, 6) + +val rdd = sc.parallelize(1 to 1000, 4).map(_ * 4).cache() +rdd.reduce(_ + _) shouldBe 2002000 +sc.getExecutorIds().size shouldBe 4 +getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty } shouldBe true + +eventually(timeout(Span(5, Seconds)), interval(Span(1, Seconds))) { + sc.getExecutorIds().size shouldBe 3 + getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r179191524 --- Diff: core/src/test/scala/org/apache/spark/CacheRecoveryManagerSuite.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.{Future, Promise} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration +import scala.reflect.ClassTag + +import org.mockito.Mockito._ +import org.scalatest.Matchers +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.CacheRecoveryManager._ +import org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT +import org.apache.spark.network.util.ByteUnit +import org.apache.spark.rpc._ +import org.apache.spark.storage.{BlockId, BlockManagerId, RDDBlockId} +import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.ThreadUtils + +class CacheRecoveryManagerSuite + extends SparkFunSuite with MockitoSugar with Matchers { + + val oneGB: Long = ByteUnit.GiB.toBytes(1).toLong + + val plentyOfMem = Map( +BlockManagerId("1", "host", 12, None) -> ((oneGB, oneGB)), +BlockManagerId("2", "host", 12, None) -> ((oneGB, oneGB)), +BlockManagerId("3", "host", 12, None) -> ((oneGB, oneGB))) + + test("replicate blocks until empty and then kill executor") { +val conf = new SparkConf() +val eam = mock[ExecutorAllocationManager] +val blocks = Seq(RDDBlockId(1, 1), RDDBlockId(2, 1)) +val bmme = FakeBMM(blocks.iterator, plentyOfMem) +val bmmeRef = DummyRef(bmme) +val cacheRecoveryManager = new CacheRecoveryManager(bmmeRef, eam, conf) +when(eam.killExecutors(Seq("1"))).thenReturn(Seq("1")) + +try { + val future = cacheRecoveryManager.startCacheRecovery(Seq("1")) + val results = ThreadUtils.awaitResult(future, Duration(3, TimeUnit.SECONDS)) + results.head shouldBe DoneRecovering --- End diff -- rather than just checking head, you can check the whole result as easily. and also here, prefer assert over shouldBe ``` assert(results === Seq(DoneRecovering)) ``` throughout this file --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r179179616 --- Diff: core/src/test/scala/org/apache/spark/scheduler/CacheRecoveryIntegrationSuite.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.util.Try + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers} +import org.scalatest.concurrent.Eventually +import org.scalatest.time.{Seconds, Span} + +import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite, TestUtils} +import org.apache.spark.internal.config._ +import org.apache.spark.network.TransportContext +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler +import org.apache.spark.rdd.RDD +import org.apache.spark.storage._ + +/** + * This is an integration test for the cache recovery feature using a local spark cluster. It + * extends the unit tests in CacheRecoveryManagerSuite which mocks a lot of cluster infrastructure. + */ +class CacheRecoveryIntegrationSuite extends SparkFunSuite +with Matchers +with BeforeAndAfterEach +with BeforeAndAfterAll +with Eventually { + + private var conf: SparkConf = makeBaseConf() + private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 4) + private val rpcHandler = new ExternalShuffleBlockHandler(transportConf, null) + private val transportContext = new TransportContext(transportConf, rpcHandler) + private val shuffleService = transportContext.createServer() + private var sc: SparkContext = _ + + private def makeBaseConf() = new SparkConf() +.setAppName("test") +.setMaster("local-cluster[4, 1, 512]") +.set("spark.dynamicAllocation.enabled", "true") +.set("spark.dynamicAllocation.executorIdleTimeout", "1s") // always +.set("spark.dynamicAllocation.cachedExecutorIdleTimeout", "1s") +.set(EXECUTOR_MEMORY.key, "512m") +.set(SHUFFLE_SERVICE_ENABLED.key, "true") +.set(DYN_ALLOCATION_CACHE_RECOVERY.key, "true") +.set(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT.key, "500s") +.set(EXECUTOR_INSTANCES.key, "1") +.set(DYN_ALLOCATION_INITIAL_EXECUTORS.key, "4") +.set(DYN_ALLOCATION_MIN_EXECUTORS.key, "3") + + override def beforeEach(): Unit = { +conf = makeBaseConf() +conf.set("spark.shuffle.service.port", shuffleService.getPort.toString) + } + + override def afterEach(): Unit = { +sc.stop() +conf = null + } + + override def afterAll(): Unit = { +shuffleService.close() + } + + private def getLocations( + sc: SparkContext, + rdd: RDD[_]): Map[BlockId, Map[BlockManagerId, BlockStatus]] = { +import scala.collection.breakOut +val blockIds: Array[BlockId] = rdd.partitions.map(p => RDDBlockId(rdd.id, p.index)) +blockIds.map { id => + id -> Try(sc.env.blockManager.master.getBlockStatus(id)).getOrElse(Map.empty) +}(breakOut) + } + + test("cached data is replicated before dynamic de-allocation") { +sc = new SparkContext(conf) +TestUtils.waitUntilExecutorsUp(sc, 4, 6) + +val rdd = sc.parallelize(1 to 1000, 4).map(_ * 4).cache() +rdd.reduce(_ + _) shouldBe 2002000 +sc.getExecutorIds().size shouldBe 4 +getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty } shouldBe true + +eventually(timeout(Span(5, Seconds)), interval(Span(1, Seconds))) { + sc.getExecutorIds().size shouldBe 3 + getLocations(sc, rdd).forall { case (_, map) => map.nonEmpty
[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20640#discussion_r179013270 --- Diff: resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala --- @@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite verifyTaskLaunched(driver, "o2") } + test("mesos declines offers from blacklisted slave") { +setBackend() + +// launches a task on a valid offer on slave s1 +val minMem = backend.executorMemory(sc) + 1024 +val minCpu = 4 +val offer1 = Resources(minMem, minCpu) +offerResources(List(offer1)) +verifyTaskLaunched(driver, "o1") + +// for any reason executor(aka mesos task) failed on s1 +val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED) +backend.statusUpdate(driver, status) +when(taskScheduler.nodeBlacklist()).thenReturn(Set("hosts1")) --- End diff -- just to re-iterate my point above -- in many cases, having an executor fail will *not* lead to `taskScheduler.nodeBlacklist()` changing as you're doing here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20640#discussion_r179012299 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -648,14 +645,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( totalGpusAcquired -= gpus gpusByTaskId -= taskId } -// If it was a failure, mark the slave as failed for blacklisting purposes if (TaskState.isFailed(state)) { - slave.taskFailures += 1 - - if (slave.taskFailures >= MAX_SLAVE_FAILURES) { -logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " + -"is Spark installed on it?") - } + logError(s"Task $taskId failed on Mesos slave $slaveId.") --- End diff -- minor: I think it would be nice to say "Mesos task $taskId...". Maybe its obvious for those spending more time with mesos, but I find I'm easily confused by the difference between a mesos task and a spark task. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20640#discussion_r179012891 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -571,7 +568,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus + totalCoresAcquired <= maxCores && mem <= offerMem && numExecutors < executorLimit && - slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && + !scheduler.nodeBlacklist().contains(offerHostname) && --- End diff -- I just want to make really sure everybody understands the big change in behavior here -- `nodeBlacklist()` currently *only* gets updated based on failures in *spark* tasks. If a mesos task fails to even start -- that is, if a spark executor fails to launch on a node -- `nodeBlacklist` does not get updated. So you could have a node that is misconfigured somehow, and you might end up repeatedly trying to launch executors on it after this changed, with the executor even failing to start each time. That is even if you have blacklisting on. This is SPARK-16630 for the non-mesos case. That is being actively worked on now -- however the work there will probably have to be yarn-specific, so there will still be followup work to get the same thing for mesos after that is in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r178967925 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.Failure + +import com.google.common.cache.CacheBuilder + +import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, Timeout} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executor's cached blocks, and then shutting + * it down. + */ +private class CacheRecoveryManager( +blockManagerMasterEndpoint: RpcEndpointRef, +executorAllocationManager: ExecutorAllocationManager, +conf: SparkConf) + extends Logging { + + private val forceKillAfterS = conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT) + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool") + private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(threadPool) + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers") + private val recoveringExecutors = CacheBuilder.newBuilder() +.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS) +.build[String, String]() + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + * @return a sequence of futures of Unit that will complete once the executor has been killed. + */ + def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +val canBeRecovered: Future[Seq[String]] = checkMem(execIds) + +canBeRecovered.flatMap { execIds => + execIds.foreach { execId => recoveringExecutors.put(execId, execId) } + Future.sequence(execIds.map { replicateUntilTimeoutThenKill }) +} + } + + def replicateUntilTimeoutThenKill(execId: String): Future[KillReason] = { +val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS) +val replicationFuture = replicateUntilDone(execId) + +Future.firstCompletedOf(List(timeoutFuture, replicationFuture)).andThen { + case scala.util.Success(DoneRecovering) => +logTrace(s"Done recovering blocks on $execId, killing now") + case scala.util.Success(Timeout) => +logWarning(s"Couldn't recover cache on $execId before $forceKillAfterS second timeout") + case Failure(ex) => +logWarning(s"Error recovering cache on $execId", ex) +}.andThen { + case _ => +kill(execId) +} + } + + /** + * Given a list of executors that will be shut down, check if there is enough free memory on the + * rest of the cluster to hold their data. Return a list of just the executors for which there + * will be enough space. Executors are included smallest first. + * + * This is a best guess implementation and it is not guaranteed that all returned executors + * will succeed. For example a block might be too big to fit on any one specific executor. + * + * @param execIds executors which will be shut down + * @
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r178966943 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.Failure + +import com.google.common.cache.CacheBuilder + +import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, Timeout} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executor's cached blocks, and then shutting + * it down. + */ +private class CacheRecoveryManager( +blockManagerMasterEndpoint: RpcEndpointRef, +executorAllocationManager: ExecutorAllocationManager, +conf: SparkConf) + extends Logging { + + private val forceKillAfterS = conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT) + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool") + private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(threadPool) + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers") + private val recoveringExecutors = CacheBuilder.newBuilder() +.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS) +.build[String, String]() + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + * @return a sequence of futures of Unit that will complete once the executor has been killed. + */ + def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +val canBeRecovered: Future[Seq[String]] = checkMem(execIds) --- End diff -- shouldn't you immediately kill those executors which dont' pass `checkMem`? are they getting killed somewhere else? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r178967087 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.Failure + +import com.google.common.cache.CacheBuilder + +import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, Timeout} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executor's cached blocks, and then shutting + * it down. + */ +private class CacheRecoveryManager( +blockManagerMasterEndpoint: RpcEndpointRef, +executorAllocationManager: ExecutorAllocationManager, +conf: SparkConf) + extends Logging { + + private val forceKillAfterS = conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT) + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool") + private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(threadPool) + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers") + private val recoveringExecutors = CacheBuilder.newBuilder() +.expireAfterWrite(forceKillAfterS * 2, TimeUnit.SECONDS) +.build[String, String]() + + /** + * Start the recover cache shutdown process for these executors + * + * @param execIds the executors to start shutting down + * @return a sequence of futures of Unit that will complete once the executor has been killed. + */ + def startCacheRecovery(execIds: Seq[String]): Future[Seq[KillReason]] = { +logDebug(s"Recover cached data before shutting down executors ${execIds.mkString(", ")}.") +val canBeRecovered: Future[Seq[String]] = checkMem(execIds) + +canBeRecovered.flatMap { execIds => + execIds.foreach { execId => recoveringExecutors.put(execId, execId) } + Future.sequence(execIds.map { replicateUntilTimeoutThenKill }) +} + } + + def replicateUntilTimeoutThenKill(execId: String): Future[KillReason] = { +val timeoutFuture = returnAfterTimeout(Timeout, forceKillAfterS) +val replicationFuture = replicateUntilDone(execId) + +Future.firstCompletedOf(List(timeoutFuture, replicationFuture)).andThen { + case scala.util.Success(DoneRecovering) => +logTrace(s"Done recovering blocks on $execId, killing now") + case scala.util.Success(Timeout) => +logWarning(s"Couldn't recover cache on $execId before $forceKillAfterS second timeout") + case Failure(ex) => +logWarning(s"Error recovering cache on $execId", ex) +}.andThen { + case _ => +kill(execId) +} + } + + /** + * Given a list of executors that will be shut down, check if there is enough free memory on the + * rest of the cluster to hold their data. Return a list of just the executors for which there + * will be enough space. Executors are included smallest first. + * + * This is a best guess implementation and it is not guaranteed that all returned executors + * will succeed. For example a block might be too big to fit on any one specific executor. + * + * @param execIds executors which will be shut down + * @
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r178964472 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -250,6 +255,44 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def recoverLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"Replicating first cached block on $execId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } + // As a heuristic, prioritize replicating the latest rdd. If this succeeds, + // CacheRecoveryManager will try to replicate the remaining rdds. --- End diff -- rather than the latest rdd, it would actually make more sense to take advantage of the LRU already in the MemoryStore: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L91 but maybe that is not easy to expose. But I think that also means that the *receiving* end will put the replicated block at the back of that LinkedHashMap, even though it really hasn't been accessed at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r178968393 --- Diff: core/src/main/scala/org/apache/spark/CacheRecoveryManager.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.Failure + +import com.google.common.cache.CacheBuilder + +import org.apache.spark.CacheRecoveryManager.{DoneRecovering, KillReason, Timeout} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.ThreadUtils + +/** + * Responsible for asynchronously replicating all of an executor's cached blocks, and then shutting + * it down. + */ +private class CacheRecoveryManager( +blockManagerMasterEndpoint: RpcEndpointRef, +executorAllocationManager: ExecutorAllocationManager, +conf: SparkConf) + extends Logging { + + private val forceKillAfterS = conf.get(DYN_ALLOCATION_CACHE_RECOVERY_TIMEOUT) + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("cache-recovery-manager-pool") + private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(threadPool) + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("cache-recovery-shutdown-timers") + private val recoveringExecutors = CacheBuilder.newBuilder() --- End diff -- I find `recoveringExecutors` pretty confusing, I think its executors that are recovering from some problem, but are going to be OK -- not executors that are about to die, which we are recovering data from. how about `drainingExecutors`? (though I have a feeling this name may have been discussed in earlier rounds of comments and this is what we settled on ... if so, thats fine.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r178959007 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def recoverLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"Replicating first cached block on $execId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } --- End diff -- sorry my comment was worded very poorly. Is there any reason you wouldn't want to transfer the on-disk blocks? I assume you'd want to replicate all of them, and just needed to change the comments elsewhere. Intentional tradeoff, as users are more likely to limit the amount of in-memory caching to only the most important stuff? Also just to be really precise -- the check below isn't whether the block is in-memory currently, its whether its been requested to cache in memory. It may have been cached as MEMORY_AND_DISK, but currently only resides on disk. Depending on why you want to limit to in-memory only, this may not be applicable. Maybe you actually want `!useDisk` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20888 ah ok, yes when run in isolation, the stage will be 0, so your change makes sense. But that is not what is making it flaky in a full test run --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20888 > if I execute the test on my machine alone it never pass. you mean it never fails on your machine, right? its only flaky when you run everything on jenkins? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20888 hmm you're right, I was looking at a different branch in my editor and didn't pay attention that it was reset in the code I linked to on master, oops. I still dont' understand your proposed solution, though -- how is checking `stageToKill != -1` better than checking `stageToKill > 0` in this case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20888: [SPARK-23775][TEST] DataFrameRangeSuite should wait for ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20888 I think you're right about killing the wrong stage, but I don't think its exactly what you've outlined. The original code doesn't try to kill a stage with ID == 0 -- instead its just waiting until that volatile is set to something > 0, and then proceeds. that seems to work fine, we do see that the stage does get canceled OK once. However, I think the problem is because the test [runs twice, with and without codegen](https://github.com/apache/spark/blob/4d37008c78d7d6b8f8a649b375ecc090700eca4f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala#L165). The first time, it'll always wait to till the stage Id is set, because of that `eventually { ... stageToKill > 0}`. however, on the second iteration, that `stageToKill` may still be > 0 based on the first iteration, not because its been set by the second iteration. So I think you just need to reset the value to -1 between iterations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20604 @vanzin @sitalkedia @jiangxb1987 I was looking at this code again, and I'd appreciate your thoughts on how this relates to [SPARK-21834](https://issues.apache.org/jira/browse/SPARK-21834) https://github.com/apache/spark/pull/19081 I actually think that SPARK-21834 probably solves the bug I was describing initially. I hit the bug on 2.2.0, and didn't properly understand the change of SPARK-21834 when proposing this change. Nonetheless, I still think this fix is a good one -- it improves code clarity in general and fixes a couple other minor cases. I'd also link the issues in jira etc. so the relationship is more clear. I'd go even further and suggest that with this fix in, we can actually remove SPARK-21834, as its no longer necessary. its not harmful, but its just confusing. thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20657 @jerryshao I know you said you wanted to take a deeper look, but its been a while. otherwise I'll merge in the next day or two --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20853: [SPARK-23729][SS] Glob resolution is done without the fr...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20853 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20770: [SPARK-23626][CORE] DAGScheduler blocked due to JobSubmi...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20770 took a quick look, agree with shivaram's observations, you've got to handle `shuffleIdToMapStage` which will not be so easy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20742: [SPARK-23572][docs] Bring "security.md" up to date.
Github user squito commented on the issue: https://github.com/apache/spark/pull/20742 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20742: [SPARK-23572][docs] Bring "security.md" up to dat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20742#discussion_r175171592 --- Diff: docs/security.md --- @@ -3,47 +3,291 @@ layout: global displayTitle: Spark Security title: Security --- +* This will become a table of contents (this text will be scraped). +{:toc} -Spark currently supports authentication via a shared secret. Authentication can be configured to be on via the `spark.authenticate` configuration parameter. This parameter controls whether the Spark communication protocols do authentication using the shared secret. This authentication is a basic handshake to make sure both sides have the same shared secret and are allowed to communicate. If the shared secret is not identical they will not be allowed to communicate. The shared secret is created as follows: +# Spark RPC -* For Spark on [YARN](running-on-yarn.html) and local deployments, configuring `spark.authenticate` to `true` will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. -* For other types of Spark deployments, the Spark parameter `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. +## Authentication -## Web UI +Spark currently supports authentication for RPC channels using a shared secret. Authentication can +be turned on by setting the `spark.authenticate` configuration parameter. -The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting -and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via [SSL settings](security.html#ssl-configuration). +The exact mechanism used to generate and distribute the shared secret is deployment-specific. -### Authentication +For Spark on [YARN](running-on-yarn.html) and local deployments, Spark will automatically handle +generating and distributing the shared secret. Each application will use a unique shared secret. In +the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of +secrets to be secure. -A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable`, `spark.ui.view.acls` and `spark.ui.view.acls.groups` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. +For other resource managers, `spark.authenticate.secret` must be configured on each of the nodes. +This secret will be shared by all the daemons and applications, so this deployment configuration is +not as secure as the above, especially when considering multi-tenant clusters. -Spark also supports modify ACLs to control who has access to modify a running Spark application. This includes things like killing the application or a task. This is controlled by the configs `spark.acls.enable`, `spark.modify.acls` and `spark.modify.acls.groups`. Note that if you are authenticating the web UI, in order to use the kill button on the web UI it might be necessary to add the users in the modify acls to the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. -Spark allows for a set of administrators to be specified in the acls who always have view and modify permissions to all the applications. is controlled by the configs `spark.admin.acls` and `spark.admin.acls.groups`. This is useful on a shared cluster where you might have administrators or support staff who help users debug applications. + +Property NameDefaultMeaning + + spark.authenticate + false + Whether Spark authenticates its internal connections. + + + spark.authenticate.secret + None + +The secret key used authentication. See above for when this configuration should be set. + + + -## Event Logging +## Encryption -If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should
[GitHub] spark pull request #20742: [SPARK-23572][docs] Bring "security.md" up to dat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20742#discussion_r175173523 --- Diff: docs/security.md --- @@ -182,54 +582,70 @@ configure those ports. -### HTTP Security Headers -Apache Spark can be configured to include HTTP Headers which aids in preventing Cross -Site Scripting (XSS), Cross-Frame Scripting (XFS), MIME-Sniffing and also enforces HTTP -Strict Transport Security. +# Kerberos + +Spark supports submitting applications in environments that use Kerberos for authentication. +In most cases, Spark relies on the credentials of the current logged in user when authenticating +to Kerberos-aware services. Such credentials can be obtained by logging in to the configured KDC +with tools like `kinit`. + +When talking to Hadoop-based services, Spark needs to obtain delegation tokens so that non-local +processes can authenticate. Spark ships with support for HDFS and other Hadoop file systems, Hive +and HBase. + +When using a Hadoop filesystem (such HDFS or WebHDFS), Spark will acquire the relevant tokens +for the service hosting the user's home directory. + +An HBase token will be obtained if HBase is in the application's classpath, and the HBase +configuration has Kerberos authentication turned (`hbase.security.authentication=kerberos`). + +Similarly, a Hive token will be obtained if Hive is in the classpath, and the configuration includes +a URIs for remote metastore services (`hive.metastore.uris` is not empty). --- End diff -- nit: either "a URI" or "URIs" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20742: [SPARK-23572][docs] Bring "security.md" up to dat...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20742#discussion_r175170426 --- Diff: docs/security.md --- @@ -3,47 +3,291 @@ layout: global displayTitle: Spark Security title: Security --- +* This will become a table of contents (this text will be scraped). +{:toc} -Spark currently supports authentication via a shared secret. Authentication can be configured to be on via the `spark.authenticate` configuration parameter. This parameter controls whether the Spark communication protocols do authentication using the shared secret. This authentication is a basic handshake to make sure both sides have the same shared secret and are allowed to communicate. If the shared secret is not identical they will not be allowed to communicate. The shared secret is created as follows: +# Spark RPC -* For Spark on [YARN](running-on-yarn.html) and local deployments, configuring `spark.authenticate` to `true` will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. -* For other types of Spark deployments, the Spark parameter `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. +## Authentication -## Web UI +Spark currently supports authentication for RPC channels using a shared secret. Authentication can +be turned on by setting the `spark.authenticate` configuration parameter. -The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting -and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via [SSL settings](security.html#ssl-configuration). +The exact mechanism used to generate and distribute the shared secret is deployment-specific. -### Authentication +For Spark on [YARN](running-on-yarn.html) and local deployments, Spark will automatically handle +generating and distributing the shared secret. Each application will use a unique shared secret. In +the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of +secrets to be secure. -A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable`, `spark.ui.view.acls` and `spark.ui.view.acls.groups` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. +For other resource managers, `spark.authenticate.secret` must be configured on each of the nodes. +This secret will be shared by all the daemons and applications, so this deployment configuration is +not as secure as the above, especially when considering multi-tenant clusters. -Spark also supports modify ACLs to control who has access to modify a running Spark application. This includes things like killing the application or a task. This is controlled by the configs `spark.acls.enable`, `spark.modify.acls` and `spark.modify.acls.groups`. Note that if you are authenticating the web UI, in order to use the kill button on the web UI it might be necessary to add the users in the modify acls to the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. -Spark allows for a set of administrators to be specified in the acls who always have view and modify permissions to all the applications. is controlled by the configs `spark.admin.acls` and `spark.admin.acls.groups`. This is useful on a shared cluster where you might have administrators or support staff who help users debug applications. + +Property NameDefaultMeaning + + spark.authenticate + false + Whether Spark authenticates its internal connections. + + + spark.authenticate.secret + None + +The secret key used authentication. See above for when this configuration should be set. + + + -## Event Logging +## Encryption -If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should
[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data
Github user squito commented on the issue: https://github.com/apache/spark/pull/19041 Thanks @brad-kaiser -- want to re-iterate my comment from Feb 2nd, I think that is really the most important part to address before getting into the details of the current implementation: > Thought some more about the race between RemoveBlock getting sent back from the executor vs when the CacheRecoveryManager tries to replicate the next block -- actually why is there the back-and-forth with the driver for every block? Why isn't there just one message from the CacheRecoveryManager to the executor, saying "Drain all RDD blocks" and then one message from the executor back to the driver when its done? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19041: [SPARK-21097][CORE] Add option to recover cached ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r175164254 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala --- @@ -246,6 +251,38 @@ class BlockManagerMasterEndpoint( blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) } + private def recoverLatestRDDBlock( + execId: String, + excludeExecutors: Seq[String], + context: RpcCallContext): Unit = { +logDebug(s"Replicating first cached block on $execId") +val excluded = excludeExecutors.flatMap(blockManagerIdByExecutor.get) +val response: Option[Future[Boolean]] = for { + blockManagerId <- blockManagerIdByExecutor.get(execId) + info <- blockManagerInfo.get(blockManagerId) + blocks = info.cachedBlocks.collect { case r: RDDBlockId => r } + // we assume blocks from the latest rdd are most relevant --- End diff -- right, I'm suggesting that you expand the *comment* to be what I had written above, so its a little easier to follow the logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20685: [SPARK-23524] Big local shuffle blocks should not be che...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20685 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20685: [SPARK-23524] Big local shuffle blocks should not be che...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20685 I agree with @cloud-fan . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20657 btw I took a look at the code in `MesosHadoopDelegationTokenManager`, there seems to be a lot of duplication that could probably be factored out, and I wonder if the things that are different really should be the same. Eg. mesos doesn't expose a conf for KERBEROS_RELOGIN, its just using the renewal time from the delegation tokens. Seems pretty easy for that to be wrong. I can open a separate ticket for that but wanted to see if this makes sense --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r172954706 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -93,11 +93,24 @@ private[spark] class Client( private val distCacheMgr = new ClientDistributedCacheManager() - private var loginFromKeytab = false - private var principal: String = null - private var keytab: String = null - private var credentials: Credentials = null - private var amKeytabFileName: String = null + private val principal = sparkConf.get(PRINCIPAL).orNull + private val keytab = sparkConf.get(KEYTAB).orNull + private val loginFromKeytab = principal != null + private val amKeytabFileName: String = { +require((principal == null) == (keytab == null), + "Both principal and keytab must be defined, or neither.") +if (loginFromKeytab) { + logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab") + // Generate a file name that can be used for the keytab file, that does not conflict + // with any user file. + new File(keytab).getName() + "-" + UUID.randomUUID().toString +} else { + null +} + } + + // Defensive copy of the credentials + private val credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) --- End diff -- this appears to be unused. did you mean to use this in `setupSecurityToken()`? not really sure what you're defending against with the copy, perhaps that should go in the comment as well ... I see it was just in the old code though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r172963955 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala --- @@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security import java.security.PrivilegedExceptionAction import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicReference import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.ui.UIUtils import org.apache.spark.util.ThreadUtils /** - * The following methods are primarily meant to make sure long-running apps like Spark - * Streaming apps can run without interruption while accessing secured services. The - * scheduleLoginFromKeytab method is called on the AM to get the new credentials. - * This method wakes up a thread that logs into the KDC - * once 75% of the renewal interval of the original credentials used for the container - * has elapsed. It then obtains new credentials and writes them to HDFS in a - * pre-specified location - the prefix of which is specified in the sparkConf by - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc. - * - each update goes to a new file, with a monotonically increasing suffix), also the - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater. - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed. + * A manager tasked with periodically updating delegation tokens needed by the application. * - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is - * called once 80% of the validity of the original credentials has elapsed. At that time the - * executor finds the credentials file with the latest timestamp and checks if it has read those - * credentials before (by keeping track of the suffix of the last file it read). If a new file has - * appeared, it will read the credentials and update the currently running UGI with it. This - * process happens again once 80% of the validity of this has expired. + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run + * without interruption while accessing secured services. It periodically logs in to the KDC with + * user-provided credentials, and contacts all the configured secure services to obtain delegation + * tokens to be distributed to the rest of the application. --- End diff -- for folks like me less familiar with this, this seems like a good spot to explain the overall flow a little bit more. Eg. The KDC provides a ticket granting ticket (tgt), which is then used to obtain delegation tokens for each service. The KDC does not expose the tgt's expiry time, so renewal is controlled by a conf (by default 1m, much more frequent than usual expiry times). Each providers delegation token provider should determine the expiry time of the delegation token, so they can be renewed appropriately. (in particular I needed an extra read to figure out why the tgt had its own renewal mechanism) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20685: [SPARK-23524] Big local shuffle blocks should not be che...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20685 it'll also help with disk corruption ... from the stack traces in SPARK-4105 you can't really tell what the source of the problem is. it'll be pretty hard to determine what the source of corruption is if we start seeing it again. anyway, I don't feel that strongly about it either way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r172581601 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging { private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) { UserGroupInformation.setConfiguration(newConfiguration(sparkConf)) val creds = deserialize(tokens) -logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}") +logInfo("Updating delegation tokens for current user.") --- End diff -- yeah I was thinking it might be handy to have it logged in the executors and driver as well, sort of as an RPC id, so you could correlate the log lines, in case there was ever a delay in propagation or a failure to get to one executor or something, since you're choosing to always log something here. Still, your call. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r172579936 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -520,4 +520,16 @@ package object config { .checkValue(v => v > 0, "The threshold should be positive.") .createWithDefault(1000) + private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO = +ConfigBuilder("spark.security.credentials.renewalRatio") + .doc("Ratio of the credential's expiration time when Spark should fetch new credentials.") + .doubleConf + .createWithDefault(0.75d) + + private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT = +ConfigBuilder("spark.security.credentials.retryWait") + .doc("How long to wait before retrying to fetch new credentials after a failure.") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("1h") --- End diff -- I thought that is what internal meant ... a user *could* specify them, but we don't document them at all, so not a stable part of the api etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20685#discussion_r172579121 --- Diff: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala --- @@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("big corrupt blocks will not be retiried") { +val corruptStream = mock(classOf[InputStream]) +when(corruptStream.read(any(), any(), any())).thenThrow(new IOException("corrupt")) +val corruptBuffer = mock(classOf[ManagedBuffer]) +when(corruptBuffer.createInputStream()).thenReturn(corruptStream) +doReturn(1L).when(corruptBuffer).size() + +val blockManager = mock(classOf[BlockManager]) +val localBmId = BlockManagerId("test-client", "test-client", 1) +doReturn(localBmId).when(blockManager).blockManagerId + doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0)) +val localBlockLengths = Seq[Tuple2[BlockId, Long]]( + ShuffleBlockId(0, 0, 0) -> corruptBuffer.size() +) + +val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) +val remoteBlockLengths = Seq[Tuple2[BlockId, Long]]( + ShuffleBlockId(0, 1, 0) -> corruptBuffer.size() +) + +val transfer = mock(classOf[BlockTransferService]) +when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())) --- End diff -- sorry my comment was vague -- I *do* think you can use `createMockTransfer` here, since that helper method already exists. I was just thinking that there may be more we could clean up -- setting up the local & remote BlockManager Id, creating the ShuffleIterator, etc. seems to have a lot of boilerplate in all the tests. But let's not to do a pure refactoring to the other tests in this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20740: [SPARK-23604][SQL] Change Statistics.isEmpty to !Statist...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20740 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r172319244 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala --- @@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security import java.security.PrivilegedExceptionAction import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicReference import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.ui.UIUtils import org.apache.spark.util.ThreadUtils /** - * The following methods are primarily meant to make sure long-running apps like Spark - * Streaming apps can run without interruption while accessing secured services. The - * scheduleLoginFromKeytab method is called on the AM to get the new credentials. - * This method wakes up a thread that logs into the KDC - * once 75% of the renewal interval of the original credentials used for the container - * has elapsed. It then obtains new credentials and writes them to HDFS in a - * pre-specified location - the prefix of which is specified in the sparkConf by - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc. - * - each update goes to a new file, with a monotonically increasing suffix), also the - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater. - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed. + * A manager tasked with periodically updating delegation tokens needed by the application. * - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is - * called once 80% of the validity of the original credentials has elapsed. At that time the - * executor finds the credentials file with the latest timestamp and checks if it has read those - * credentials before (by keeping track of the suffix of the last file it read). If a new file has - * appeared, it will read the credentials and update the currently running UGI with it. This - * process happens again once 80% of the validity of this has expired. + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run + * without interruption while accessing secured services. It periodically logs in to the KDC with + * user-provided credentials, and contacts all the configured secure services to obtain delegation + * tokens to be distributed to the rest of the application. + * + * New delegation tokens are created once 75% of the renewal interval of the original tokens has + * elapsed. The new tokens are both added to the current user, and also sent to the Spark driver + * once it's registered with the AM. The driver is tasked with distributing the tokens to other + * processes that might need them. */ private[yarn] class AMCredentialRenewer( sparkConf: SparkConf, -hadoopConf: Configuration, -credentialManager: YARNHadoopDelegationTokenManager) extends Logging { +hadoopConf: Configuration) extends Logging { - private var lastCredentialsFileSuffix = 0 + private val principal = sparkConf.get(PRINCIPAL).get + private val keytab = sparkConf.get(KEYTAB).get + private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf) - private val credentialRenewerThread: ScheduledExecutorService = + private val renewalExecutor: ScheduledExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread") - private val hadoopUtil = SparkHadoopUtil.get + private val driverRef = new AtomicReference[RpcEndpointRef]() - private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) - private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION) - private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT) - private val freshHadoopConf = -hadoopUtil.getConfBypass
[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r172322650 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -520,4 +520,16 @@ package object config { .checkValue(v => v > 0, "The threshold should be positive.") .createWithDefault(1000) + private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO = +ConfigBuilder("spark.security.credentials.renewalRatio") + .doc("Ratio of the credential's expiration time when Spark should fetch new credentials.") + .doubleConf + .createWithDefault(0.75d) + + private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT = +ConfigBuilder("spark.security.credentials.retryWait") + .doc("How long to wait before retrying to fetch new credentials after a failure.") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("1h") --- End diff -- `.internal()` for both --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r172323592 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala --- @@ -18,221 +18,156 @@ package org.apache.spark.deploy.yarn.security import java.security.PrivilegedExceptionAction import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicReference import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.ui.UIUtils import org.apache.spark.util.ThreadUtils /** - * The following methods are primarily meant to make sure long-running apps like Spark - * Streaming apps can run without interruption while accessing secured services. The - * scheduleLoginFromKeytab method is called on the AM to get the new credentials. - * This method wakes up a thread that logs into the KDC - * once 75% of the renewal interval of the original credentials used for the container - * has elapsed. It then obtains new credentials and writes them to HDFS in a - * pre-specified location - the prefix of which is specified in the sparkConf by - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc. - * - each update goes to a new file, with a monotonically increasing suffix), also the - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater. - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed. + * A manager tasked with periodically updating delegation tokens needed by the application. * - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is - * called once 80% of the validity of the original credentials has elapsed. At that time the - * executor finds the credentials file with the latest timestamp and checks if it has read those - * credentials before (by keeping track of the suffix of the last file it read). If a new file has - * appeared, it will read the credentials and update the currently running UGI with it. This - * process happens again once 80% of the validity of this has expired. + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run + * without interruption while accessing secured services. It periodically logs in to the KDC with + * user-provided credentials, and contacts all the configured secure services to obtain delegation + * tokens to be distributed to the rest of the application. + * + * New delegation tokens are created once 75% of the renewal interval of the original tokens has + * elapsed. The new tokens are both added to the current user, and also sent to the Spark driver + * once it's registered with the AM. The driver is tasked with distributing the tokens to other + * processes that might need them. */ private[yarn] class AMCredentialRenewer( sparkConf: SparkConf, -hadoopConf: Configuration, -credentialManager: YARNHadoopDelegationTokenManager) extends Logging { +hadoopConf: Configuration) extends Logging { - private var lastCredentialsFileSuffix = 0 + private val principal = sparkConf.get(PRINCIPAL).get + private val keytab = sparkConf.get(KEYTAB).get + private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf) - private val credentialRenewerThread: ScheduledExecutorService = + private val renewalExecutor: ScheduledExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread") - private val hadoopUtil = SparkHadoopUtil.get + private val driverRef = new AtomicReference[RpcEndpointRef]() - private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) - private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION) - private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT) - private val freshHadoopConf = -hadoopUtil.getConfBypass
[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r172325576 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala --- @@ -144,7 +145,8 @@ class SparkHadoopUtil extends Logging { private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) { UserGroupInformation.setConfiguration(newConfiguration(sparkConf)) val creds = deserialize(tokens) -logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}") +logInfo("Updating delegation tokens for current user.") --- End diff -- just a thought -- rather than just serializing the Credentials, would it be helpful to serialize a timestamp when the tokens were obtained and when they will be refreshed as well, so it could be logged here? you have spent more time debugging cases with problems so you will probably have a better idea if that would be helpful --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r172321966 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1009,7 +987,7 @@ private[spark] class Client( } def setupCredentials(): Unit = { -loginFromKeytab = sparkConf.contains(PRINCIPAL.key) +loginFromKeytab = sparkConf.contains(PRINCIPAL) --- End diff -- if a user only specifies keytab, but no principal, I don't think this will fail in a friendly way. This will be a no-op, so it'll succeed, and then in ApplicationMaster / AMCredentialRenewer, you'll get an error trying to do `sparkConf.get(PRINCIPAL).get`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20685#discussion_r172226910 --- Diff: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala --- @@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("big corrupt blocks will not be retiried") { --- End diff -- typo: retried (or maybe "retired", not sure) though I think a better name would be "big blocks are not checked for corruption" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20685: [SPARK-23524] Big local shuffle blocks should not...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20685#discussion_r172232973 --- Diff: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala --- @@ -352,6 +352,63 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT intercept[FetchFailedException] { iterator.next() } } + test("big corrupt blocks will not be retiried") { +val corruptStream = mock(classOf[InputStream]) +when(corruptStream.read(any(), any(), any())).thenThrow(new IOException("corrupt")) +val corruptBuffer = mock(classOf[ManagedBuffer]) +when(corruptBuffer.createInputStream()).thenReturn(corruptStream) +doReturn(1L).when(corruptBuffer).size() + +val blockManager = mock(classOf[BlockManager]) +val localBmId = BlockManagerId("test-client", "test-client", 1) +doReturn(localBmId).when(blockManager).blockManagerId + doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0)) +val localBlockLengths = Seq[Tuple2[BlockId, Long]]( + ShuffleBlockId(0, 0, 0) -> corruptBuffer.size() +) + +val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2) +val remoteBlockLengths = Seq[Tuple2[BlockId, Long]]( + ShuffleBlockId(0, 1, 0) -> corruptBuffer.size() +) + +val transfer = mock(classOf[BlockTransferService]) +when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())) --- End diff -- you can reuse `createMockTransfer` to simplify this a little. (actually, a bunch of this test code looks like it could be refactored across these tests -- but we can leave that out of this change.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r170383918 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient { /** * Request that the cluster manager kill the specified executors. * - * When asking the executor to be replaced, the executor loss is considered a failure, and - * killed tasks that are running on the executor will count towards the failure limits. If no - * replacement is being requested, then the tasks will not count towards the limit. - * * @param executorIds identifiers of executors to kill - * @param replace whether to replace the killed executors with new ones, default false + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been killed + * @param countFailures if there are tasks running on the executors when they are killed, whether --- End diff -- whoops, I was supposed to set `countFailures = true` in `sc.killAndReplaceExecutors`, thanks for catching that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20645: SPARK-23472: Add defaultJavaOptions for drivers and exec...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20645 I agree it would be nicer to have this be a more general feature. I would prefer an approach which didn't require a different configuration name, just as its more to document & for users to keep track of. An alternative would be to have another syntax for appending to an options, perhaps ++= like scala , eg. "--conf spark.executor.extraJavaOptions++=..." You'd still need to tag the ConfigBuilder with what separator to use to merge the values together. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20652: [SPARK-23476][CORE] Generate secret in local mode when a...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20652 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20640 sure @skonto, great to have somebody more knowledgable on mesos taking a closer look at this. sorry @IgorBerman I promised a quick fix here, but have realized this is more complicated than we originally thought. but the discussion is moving forward at least, so thanks for driving it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20640 cc @attilapiros , you may be interested b/c of how this relates to SPARK-16630 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20640 ok hmm ... so actually this change would lose some important functionality then. unfortunately I don't have a clear picture yet of how to solve SPARK-16630 along with the other blacklisting. sorry this might actually need a more general solution, need to think about it a bit more ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20640 @susanxhuynh good point about changing default behavior. I'd rather have the change so we have more unified behavior between mesos and other cluster managers. But I have never run spark on mesos or even had much interaction with users of spark on mesos, so I will defer to others judgement. Another option: we could leave the old behavior, unless a user sets spark.blacklist.enabled=true. its a little wonky, but that also guarantees you always get some blacklisting. I've also been considering turning blacklisting on by default in spark 2.4. So far I've had good feedback from users running it (though we'll get way more feedback when its on by default). btw, one hole in the general blacklisting is handling cases when the executors fail to even start: https://issues.apache.org/jira/browse/SPARK-16630 would that be covered by mesos with the old code? just want to make sure we aren't losing that ability. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20640 thanks @IgorBerman, description looks fine to me now, maybe I saw it wrong before. your test sounds pretty good to me ... you could turn on debug logging for MesosCoarseGrainedSchedulerBackend and look for these log msgs: https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala#L603 What do you mean "it didn't have much effect" -- sounds like it did exactly the right thing? Sorry, I don't really understand description of the other bug you mentioned. Why shouldn't it start a 2nd executor on the same slave for the same application? That seems fine until you have enough failures for the node blacklisting to take effect. There is also a small race (that is relatively benign) that would allow you to get a an executor on a node which you are in the middle of blacklisting. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20604 Jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20604 known flaky test: https://issues.apache.org/jira/browse/SPARK-23458 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20635: [SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20635 thanks @ivoson , merged! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17619: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/17619 for anyone watching this: @IgorBerman submitted an updated version of this here https://github.com/apache/spark/pull/20640 which I plan to merge unless there are any objections. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20640 lgtm @IgorBerman can you cleanup the PR description a little? headers got duplicated. And I'd reword a bit to something like > This updates the Mesos scheduler to integrate with the common logic for node-blacklisting across all cluster managers in BlacklistTracker. Specifically, it removes a hardcoded MAX_SLAVE_FAILURES = 2 in MesosCoarseGrainedSchedulerBackend, and uses the blacklist from the BlacklistTracker, as Yarn does. > This closes https://github.com/apache/spark/pull/17619 (the last "this closes" bit is useful for some tools we have, it will close the other one when this is merged.) thanks for doing this. I will probably leave this open for a bit if more mesos users have thoughts --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17619: [SPARK-19755][Mesos] Blacklist is always active f...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17619#discussion_r169458952 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -484,7 +481,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus + totalCoresAcquired <= maxCores && mem <= offerMem && numExecutors() < executorLimit && - slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && --- End diff -- thanks @skonto , also I realized there was more explicit calls to declineOffer than I thought initially after a closer read of the code. btw @IgorBerman has opened an updated version of this PR here https://github.com/apache/spark/pull/20640 -- would appreciate a review over there --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r169438419 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -455,7 +461,12 @@ private[spark] class ExecutorAllocationManager( val executorsRemoved = if (testing) { executorIdsToBeRemoved } else { - client.killExecutors(executorIdsToBeRemoved) + // We don't want to change our target number of executors, because we already did that + // when the task backlog decreased. Normally there wouldn't be any tasks running on these + // executors, but maybe the scheduler *just* decided to run a task there -- in that case, --- End diff -- good point, I didn't look closely enough at the semantics of `force` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r169437530 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- What would calling this mean with dynamic allocation on? Note this api explicitly says its meant to adjust resource usage downwards. If you've got just one executor, and then you kill it, should your app sit with 0 executors? Or even if you've got 10 executors, and you kill one -- when is dynamic allocation allowed to bump the total back up? I can't think of useful clear semantics for this (though this is not necessary to fix the bug, I could pull this out and move to a discussion in a new jira) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r169436456 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -334,6 +336,10 @@ private[spark] class ExecutorAllocationManager( // If the new target has not changed, avoid sending a message to the cluster manager if (numExecutorsTarget < oldNumExecutorsTarget) { +// We lower the target number of executors but don't actively kill any yet. We do this --- End diff -- I was trying to answer a different question -- if we don't kill the executor now, why even bother lowering the target number? as that would be an alternative solution -- don't adjust the target number here at all, just wait until you kill the executors for being idle. (and really I'm just guessing at the logic.) lemme try to reword this some ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20640#discussion_r169413261 --- Diff: resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala --- @@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite verifyTaskLaunched(driver, "o2") } + test("mesos declines offers from blacklisted slave") { +setBackend() + +// launches a task on a valid offer on slave s0 +val minMem = backend.executorMemory(sc) + 1024 +val minCpu = 4 +val offer1 = Resources(minMem, minCpu) +offerResources(List(offer1)) +verifyTaskLaunched(driver, "o1") + +// for any reason executor(aka mesos task) failed on s0 +val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED) +backend.statusUpdate(driver, status) +when(taskScheduler.nodeBlacklist()).thenReturn(Set("s1")) + +val offer2 = Resources(minMem, minCpu) +// Launches a new task on a valid offer from the same slave +offerResources(List(offer2)) +// but since it's blacklisted the offer is declined +verifyDeclinedOffer(driver, createOfferId("o1")) --- End diff -- ah nevermind. I took another look at the code and now I see how this works --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20640 thanks for updating. can you also update the PR description? yeah its fine to just update this one. You can't in general update others' prs, unless they give you push permissions to their repos. You *can* start from their branch, and then add your changes on top -- that is a little preferable as that way the commit history includes their work, so when someone merges its a bit more obvious. If you can adjust this PR that way, that would be nice -- but otherwise its OK, I will try to remember to adjust attribution when merging --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20640#discussion_r169391079 --- Diff: resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala --- @@ -108,6 +108,28 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite verifyTaskLaunched(driver, "o2") } + test("mesos declines offers from blacklisted slave") { +setBackend() + +// launches a task on a valid offer on slave s0 +val minMem = backend.executorMemory(sc) + 1024 +val minCpu = 4 +val offer1 = Resources(minMem, minCpu) +offerResources(List(offer1)) +verifyTaskLaunched(driver, "o1") + +// for any reason executor(aka mesos task) failed on s0 +val status = createTaskStatus("0", "s1", TaskState.TASK_FAILED) +backend.statusUpdate(driver, status) +when(taskScheduler.nodeBlacklist()).thenReturn(Set("s1")) + +val offer2 = Resources(minMem, minCpu) +// Launches a new task on a valid offer from the same slave +offerResources(List(offer2)) +// but since it's blacklisted the offer is declined +verifyDeclinedOffer(driver, createOfferId("o1")) --- End diff -- will this actually pass? I thought it wouldn't b/c the filtering is done inside `buildMesosTasks`, which never calls `declineOffer` on offers that fail `canLaunchTask`. (a separate thing which needs fixing -- you could open another issue for that.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20640 I understand if you want to do something like this for yourself to unblock, but I think I'm -1 on merging this because of adding more configs just for a stopgap. but I think we agree on the right solution here -- if you post that I will try to review promptly since I've got this paged in (though it might take a bit to merge as we figure out how to test ...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20640 @IgorBerman I actually think that https://github.com/apache/spark/pull/17619 is the right approach. As @timout pointed out on that one, this functionality doesn't need to be covered in mesos specific code at all, as its covered by the BlacklistTracker. I don't like introducing new configs when we don't really need them. Other than this being less invasive, is there another advantage here? The modifications I suggested to that PR are relatively small -- I think its fine if you want to open a PR that is the original updated with my suggestions (credit still to @timout), as I'm not sure if they're still working on it. (my fault too as there was such a long delay for a proper review.) While I had some open questions, I think its a clear improvement in any case. I just need to get a little help on mesos testing, we can ask on the dev list. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20640: [SPARK-19755][Mesos] Blacklist is always active for Meso...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20640 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20635: [SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20635 Jenkins, Ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20635: [SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20635 lgtm assuming tests pass --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20604 @tgravescs @vanzin @zsxwing could you take a look? thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20424: [Spark-23240][python] Better error message when extraneo...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20424 still lgtm, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20623: [SPARK-23413][UI] Fix sorting tasks by Host / Exe...
Github user squito closed the pull request at: https://github.com/apache/spark/pull/20623 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20601: [SPARK-23413][UI] Fix sorting tasks by Host / Executor I...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20601 ack I merged to master but screwed up on 2.3 -- fixing that here: https://github.com/apache/spark/pull/20623 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20623: [SPARK-23413][UI] Fix sorting tasks by Host / Exe...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/20623 [SPARK-23413][UI] Fix sorting tasks by Host / Executor ID at the Stag⦠â¦e page ## What changes were proposed in this pull request? Fixing exception got at sorting tasks by Host / Executor ID: ``` java.lang.IllegalArgumentException: Invalid sort column: Host at org.apache.spark.ui.jobs.ApiHelper$.indexName(StagePage.scala:1017) at org.apache.spark.ui.jobs.TaskDataSource.sliceData(StagePage.scala:694) at org.apache.spark.ui.PagedDataSource.pageData(PagedTable.scala:61) at org.apache.spark.ui.PagedTable$class.table(PagedTable.scala:96) at org.apache.spark.ui.jobs.TaskPagedTable.table(StagePage.scala:708) at org.apache.spark.ui.jobs.StagePage.liftedTree1$1(StagePage.scala:293) at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:282) at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82) at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82) at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90) at javax.servlet.http.HttpServlet.service(HttpServlet.java:687) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848) at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584) ``` Moreover some refactoring to avoid similar problems by introducing constants for each header name and reusing them at the identification of the corresponding sorting index. ## How was this patch tested? Manually: ![screen shot 2018-02-13 at 18 57 10](https://user-images.githubusercontent.com/2017933/36166532-1cfdf3b8-10f3-11e8-8d32-5fcaad2af214.png) Author: âattilapirosâ <piros.attila.zs...@gmail.com> Closes #20601 from attilapiros/SPARK-23413. (cherry picked from commit 1dc2c1d5e85c5f404f470aeb44c1f3c22786bdea) You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark fix_backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20623.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20623 commit f7a22827694a3aa92e8a7dd20195e2895e86880a Author: âattilapirosâ <piros.attila.zsolt@...> Date: 2018-02-15T19:51:24Z [SPARK-23413][UI] Fix sorting tasks by Host / Executor ID at the Stage page ## What changes were proposed in this pull request? Fixing exception got at sorting tasks by Host / Executor ID: ``` java.lang.IllegalArgumentException: Invalid sort column: Host at org.apache.spark.ui.jobs.ApiHelper$.indexName(StagePage.scala:1017) at org.apache.spark.ui.jobs.TaskDataSource.sliceData(StagePage.scala:694) at org.apache.spark.ui.PagedDataSource.pageData(PagedTable.scala:61) at org.apache.spark.ui.PagedTable$class.table(PagedTable.scala:96) at org.apache.spark.ui.jobs.TaskPagedTable.table(StagePage.scala:708) at org.apache.spark.ui.jobs.StagePage.liftedTree1$1(StagePage.scala:293) at org.apache.spark.ui.jobs.StagePage.render(StagePage.scala:282) at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82) at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82) at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90) at javax.servlet.http.HttpServlet.service(HttpServlet.java:687) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848) at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584) ``` Moreover some refactoring to avoid similar problems by introducing constants for each header name and reusing them at the identification of the corresponding sorting index. ## How was this patch tested? Manually: ![screen shot 2018-02-13 at 18 57 10](https://user-images.githubusercontent.com/2017933/36166532-1cfdf3b8-10f3-11e8-8d32-5fcaad2af214.png) Author: âattilapirosâ <piros.attila.zs...@gmail.com> Closes #20601 from attilapiros/SPARK-23413. (cherry picked from commit 1dc2c1d5e85c5f404f470aeb44c1f3c22786bdea) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20601: [SPARK-23413][UI] Fix sorting tasks by Host / Executor I...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20601 Everything that might have changed from this has passed, the failures are known flaky tests: https://issues.apache.org/jira/browse/SPARK-23369 https://issues.apache.org/jira/browse/SPARK-23390 merging to master / 2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17619: [SPARK-19755][Mesos] Blacklist is always active f...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17619#discussion_r168509141 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -484,7 +481,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus + totalCoresAcquired <= maxCores && mem <= offerMem && numExecutors() < executorLimit && - slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && --- End diff -- rather than just deleting this, we should replace it with a check to `scheduler.nodeBlacklist()`, like the YarnScheduler is doing here: https://github.com/apache/spark/blob/44e20c42254bc6591b594f54cd94ced5fcfadae3/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L128 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17208: [SPARK-19868] conflict TasksetManager lead to spark stop...
Github user squito commented on the issue: https://github.com/apache/spark/pull/17208 hmm I think you're right @zsxwing that we should be updating `isZombie` before `sched.dagScheduler.taskEnded` and `sched.dagScheduler.taskSetFailed` is called, just to keep state consistent. I don't think you'll actually hit the bug described here, as (a) if it was from a fetch failure, `isZombie` is already set first or if (b) its just a regular task failure, and it leads to the stage getting aborted, then there aren't any more retries of the stage anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20601: [SPARK-23413][UI] Fix sorting tasks by Host / Exe...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20601#discussion_r168313710 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -963,33 +965,60 @@ private[ui] class TaskPagedTable( private object ApiHelper { + val HEADER_ID = "ID" + val HEADER_TASK_INDEX = "Index" + val HEADER_ATTEMPT = "Attempt" + val HEADER_STATUS = "Status" + val HEADER_LOCALITY = "Locality Level" + val HEADER_EXECUTOR = "Executor ID" + val HEADER_HOST = "Host" + val HEADER_LAUNCH_TIME = "Launch Time" + val HEADER_DURATION = "Duration" + val HEADER_SCHEDULER_DELAY = "Scheduler Delay" + val HEADER_DESER_TIME = "Task Deserialization Time" + val HEADER_GC_TIME = "GC Time" + val HEADER_SER_TIME = "Result Serialization Time" + val HEADER_GETTING_RESULT_TIME = "Getting Result Time" + val HEADER_PEAK_MEM = "Peak Execution Memory" + val HEADER_ACCUMULATORS = "Accumulators" + val HEADER_INPUT_SIZE = "Input Size / Records" + val HEADER_OUTPUT_SIZE = "Output Size / Records" + val HEADER_SHUFFLE_READ_TIME = "Shuffle Read Blocked Time" + val HEADER_SHUFFLE_TOTAL_READS = "Shuffle Read Size / Records" + val HEADER_SHUFFLE_REMOTE_READS = "Shuffle Remote Reads" + val HEADER_SHUFFLE_WRITE_TIME = "Write Time" + val HEADER_SHUFFLE_WRITE_SIZE = "Shuffle Write Size / Records" + val HEADER_MEM_SPILL = "Shuffle Spill (Memory)" + val HEADER_DISK_SPILL = "Shuffle Spill (Disk)" + val HEADER_ERROR = "Errors" private val COLUMN_TO_INDEX = Map( -"ID" -> null.asInstanceOf[String], -"Index" -> TaskIndexNames.TASK_INDEX, -"Attempt" -> TaskIndexNames.ATTEMPT, -"Status" -> TaskIndexNames.STATUS, -"Locality Level" -> TaskIndexNames.LOCALITY, -"Executor ID / Host" -> TaskIndexNames.EXECUTOR, -"Launch Time" -> TaskIndexNames.LAUNCH_TIME, -"Duration" -> TaskIndexNames.DURATION, -"Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY, -"Task Deserialization Time" -> TaskIndexNames.DESER_TIME, -"GC Time" -> TaskIndexNames.GC_TIME, -"Result Serialization Time" -> TaskIndexNames.SER_TIME, -"Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME, -"Peak Execution Memory" -> TaskIndexNames.PEAK_MEM, -"Accumulators" -> TaskIndexNames.ACCUMULATORS, -"Input Size / Records" -> TaskIndexNames.INPUT_SIZE, -"Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE, -"Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME, -"Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS, -"Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS, -"Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME, -"Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE, -"Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL, -"Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL, -"Errors" -> TaskIndexNames.ERROR) +HEADER_ID -> null.asInstanceOf[String], +HEADER_TASK_INDEX -> TaskIndexNames.TASK_INDEX, +HEADER_ATTEMPT -> TaskIndexNames.ATTEMPT, +HEADER_STATUS -> TaskIndexNames.STATUS, +HEADER_LOCALITY -> TaskIndexNames.LOCALITY, +HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR, +HEADER_HOST -> TaskIndexNames.EXECUTOR, --- End diff -- or even go back to the 2.2 behavior, with executor & host in the same column. I do think having a separate column for host, and having it be sortable, is actually better ... but just trying to think of simple solutions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20601: [SPARK-23413][UI] Fix sorting tasks by Host / Exe...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20601#discussion_r168211371 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -963,33 +965,60 @@ private[ui] class TaskPagedTable( private object ApiHelper { + val HEADER_ID = "ID" + val HEADER_TASK_INDEX = "Index" + val HEADER_ATTEMPT = "Attempt" + val HEADER_STATUS = "Status" + val HEADER_LOCALITY = "Locality Level" + val HEADER_EXECUTOR = "Executor ID" + val HEADER_HOST = "Host" + val HEADER_LAUNCH_TIME = "Launch Time" + val HEADER_DURATION = "Duration" + val HEADER_SCHEDULER_DELAY = "Scheduler Delay" + val HEADER_DESER_TIME = "Task Deserialization Time" + val HEADER_GC_TIME = "GC Time" + val HEADER_SER_TIME = "Result Serialization Time" + val HEADER_GETTING_RESULT_TIME = "Getting Result Time" + val HEADER_PEAK_MEM = "Peak Execution Memory" + val HEADER_ACCUMULATORS = "Accumulators" + val HEADER_INPUT_SIZE = "Input Size / Records" + val HEADER_OUTPUT_SIZE = "Output Size / Records" + val HEADER_SHUFFLE_READ_TIME = "Shuffle Read Blocked Time" + val HEADER_SHUFFLE_TOTAL_READS = "Shuffle Read Size / Records" + val HEADER_SHUFFLE_REMOTE_READS = "Shuffle Remote Reads" + val HEADER_SHUFFLE_WRITE_TIME = "Write Time" + val HEADER_SHUFFLE_WRITE_SIZE = "Shuffle Write Size / Records" + val HEADER_MEM_SPILL = "Shuffle Spill (Memory)" + val HEADER_DISK_SPILL = "Shuffle Spill (Disk)" + val HEADER_ERROR = "Errors" private val COLUMN_TO_INDEX = Map( -"ID" -> null.asInstanceOf[String], -"Index" -> TaskIndexNames.TASK_INDEX, -"Attempt" -> TaskIndexNames.ATTEMPT, -"Status" -> TaskIndexNames.STATUS, -"Locality Level" -> TaskIndexNames.LOCALITY, -"Executor ID / Host" -> TaskIndexNames.EXECUTOR, -"Launch Time" -> TaskIndexNames.LAUNCH_TIME, -"Duration" -> TaskIndexNames.DURATION, -"Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY, -"Task Deserialization Time" -> TaskIndexNames.DESER_TIME, -"GC Time" -> TaskIndexNames.GC_TIME, -"Result Serialization Time" -> TaskIndexNames.SER_TIME, -"Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME, -"Peak Execution Memory" -> TaskIndexNames.PEAK_MEM, -"Accumulators" -> TaskIndexNames.ACCUMULATORS, -"Input Size / Records" -> TaskIndexNames.INPUT_SIZE, -"Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE, -"Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME, -"Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS, -"Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS, -"Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME, -"Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE, -"Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL, -"Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL, -"Errors" -> TaskIndexNames.ERROR) +HEADER_ID -> null.asInstanceOf[String], +HEADER_TASK_INDEX -> TaskIndexNames.TASK_INDEX, +HEADER_ATTEMPT -> TaskIndexNames.ATTEMPT, +HEADER_STATUS -> TaskIndexNames.STATUS, +HEADER_LOCALITY -> TaskIndexNames.LOCALITY, +HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR, +HEADER_HOST -> TaskIndexNames.EXECUTOR, --- End diff -- another alternative is to disable sorting by host, and just fix sorting by executor. That could go into 2.3.1 without breaking compatibility. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20596: [SPARK-23404][CORE]When the underlying buffers are direc...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20596 Have you seen a case where you actually have an off-heap buffer passed in, though the desire storage is on-heap? Eg. if its comes from the block transfer service than I think it will always be on-heap: https://github.com/apache/spark/blob/2ee76c22b6e48e643694c9475e5f0d37124215e7/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala#L108 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20474: [SPARK-23235][Core] Add executor Threaddump to api
Github user squito commented on the issue: https://github.com/apache/spark/pull/20474 merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [WIP][SPARK-23365][CORE] Do not adjust num execut...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/20604 [WIP][SPARK-23365][CORE] Do not adjust num executors when killing idle executors. The ExecutorAllocationManager should not adjust the target number of executors when killing idle executors, as it has already adjusted the target number down based on the task backlog. The change is more than just flipping the value of `replace` because `replace` also implied failure handling. Furthermore, the name `replace` was misleading with DynamicAllocation on, as the target number of executors is changed outside of the call to `killExecutors`. TODO: testing You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-23365 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20604.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20604 commit b5a39dab324be4bb358682720cf0f7e55272559d Author: Imran Rashid <irashid@...> Date: 2018-02-13T22:07:26Z [SPARK-23365] Do not adjust num executors when killing idle executors. The ExecutorAllocationManager should not adjust the target number of executors when killing idle executors, as it has already adjusted the target number down based on the task backlog. The change is more than just flipping the value of `replace` because `replace` also implied failure handling. Furthermore, the name `replace` was misleading with DynamicAllocation on, as the target number of executors is changed outside of the call to `killExecutors`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20532: [SPARK-23353][CORE] Allow ExecutorMetricsUpdate events t...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20532 I agree with @jiangxb1987 ... we already have issues with event logs being too big, as it the driver gets backlogged even writing them out, and then the history server takes a long time to parse those files. There have been recent improvements to that, but doesn't mean we should reintroduce the problem. I'm not saying this doesn't have a use, I'd just like to figure out if this the best way to do it. If it only has one very specific use case for @LantaoJin , then maybe they have an alternative still using public apis, with a custom listener as I suggested. I worry a user might turn this on (why not, more data is better) and then later on hit other scaling challenges and not realize this was the problem. Or if this does have some general use case for all users, then maybe its fine, but I haven't seen that yet. And maybe there is a better way to do that ... do we need another way to get detailed output metrics from executors, that doesn't have some of the scaling challenges of the event log? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20601: [SPARK-23413][UI] Fix sorting tasks by Host / Exe...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20601#discussion_r167991914 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -963,33 +965,60 @@ private[ui] class TaskPagedTable( private object ApiHelper { + val HEADER_ID = "ID" + val HEADER_TASK_INDEX = "Index" + val HEADER_ATTEMPT = "Attempt" + val HEADER_STATUS = "Status" + val HEADER_LOCALITY = "Locality Level" + val HEADER_EXECUTOR = "Executor ID" + val HEADER_HOST = "Host" + val HEADER_LAUNCH_TIME = "Launch Time" + val HEADER_DURATION = "Duration" + val HEADER_SCHEDULER_DELAY = "Scheduler Delay" + val HEADER_DESER_TIME = "Task Deserialization Time" + val HEADER_GC_TIME = "GC Time" + val HEADER_SER_TIME = "Result Serialization Time" + val HEADER_GETTING_RESULT_TIME = "Getting Result Time" + val HEADER_PEAK_MEM = "Peak Execution Memory" + val HEADER_ACCUMULATORS = "Accumulators" + val HEADER_INPUT_SIZE = "Input Size / Records" + val HEADER_OUTPUT_SIZE = "Output Size / Records" + val HEADER_SHUFFLE_READ_TIME = "Shuffle Read Blocked Time" + val HEADER_SHUFFLE_TOTAL_READS = "Shuffle Read Size / Records" + val HEADER_SHUFFLE_REMOTE_READS = "Shuffle Remote Reads" + val HEADER_SHUFFLE_WRITE_TIME = "Write Time" + val HEADER_SHUFFLE_WRITE_SIZE = "Shuffle Write Size / Records" + val HEADER_MEM_SPILL = "Shuffle Spill (Memory)" + val HEADER_DISK_SPILL = "Shuffle Spill (Disk)" + val HEADER_ERROR = "Errors" private val COLUMN_TO_INDEX = Map( -"ID" -> null.asInstanceOf[String], -"Index" -> TaskIndexNames.TASK_INDEX, -"Attempt" -> TaskIndexNames.ATTEMPT, -"Status" -> TaskIndexNames.STATUS, -"Locality Level" -> TaskIndexNames.LOCALITY, -"Executor ID / Host" -> TaskIndexNames.EXECUTOR, -"Launch Time" -> TaskIndexNames.LAUNCH_TIME, -"Duration" -> TaskIndexNames.DURATION, -"Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY, -"Task Deserialization Time" -> TaskIndexNames.DESER_TIME, -"GC Time" -> TaskIndexNames.GC_TIME, -"Result Serialization Time" -> TaskIndexNames.SER_TIME, -"Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME, -"Peak Execution Memory" -> TaskIndexNames.PEAK_MEM, -"Accumulators" -> TaskIndexNames.ACCUMULATORS, -"Input Size / Records" -> TaskIndexNames.INPUT_SIZE, -"Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE, -"Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME, -"Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS, -"Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS, -"Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME, -"Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE, -"Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL, -"Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL, -"Errors" -> TaskIndexNames.ERROR) +HEADER_ID -> null.asInstanceOf[String], +HEADER_TASK_INDEX -> TaskIndexNames.TASK_INDEX, +HEADER_ATTEMPT -> TaskIndexNames.ATTEMPT, +HEADER_STATUS -> TaskIndexNames.STATUS, +HEADER_LOCALITY -> TaskIndexNames.LOCALITY, +HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR, +HEADER_HOST -> TaskIndexNames.EXECUTOR, --- End diff -- sorting by host and executor is not the same ... you might have executors 1 & 5 on host A, and execs 2,3,4 on host B. The 2.2 UI had both executor and host in the same column: https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L1203 I think we either need to go back to having one column for both, or add an index on host. thoughts @vanzin ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20424: [Spark-23240][python] Better error message when extraneo...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20424 ah got it. sounds good to me, I will defer to @HyukjinKwon 's judgement. I think this change looks fine --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20474: [SPARK-23235][Core] Add executor Threaddump to api
Github user squito commented on the issue: https://github.com/apache/spark/pull/20474 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19788 Jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20424: [Spark-23240][python] Better error message when extraneo...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20424 lgtm @bersprockets you mentioned wanting to try the other route as well -- whats the status on that? shoudl we still wait on this one? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20474: [SPARK-23235][Core] Add executor Threaddump to ap...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20474#discussion_r167917613 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala --- @@ -51,6 +51,29 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { @Path("executors") def executorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(true)) + @GET + @Path("executors/{executorId}/threads") + def threadDump(@PathParam("executorId") execId: String): Array[ThreadStackTrace] = withUI { ui => +if (execId != SparkContext.DRIVER_IDENTIFIER && !execId.forall(Character.isDigit)) { + throw new BadParameterException( +s"Invalid executorId: neither '${SparkContext.DRIVER_IDENTIFIER}' nor number.") +} + +val safeSparkContext = ui.sc.getOrElse { + throw new ServiceUnavailable("Thread dumps not available through the history server.") +} + +ui.store.asOption(ui.store.executorSummary(execId)) match { + case Some(executorSummary) if executorSummary.isActive => + val safeThreadDump = safeSparkContext.getExecutorThreadDump(execId).getOrElse { +throw new NotFoundException("No thread dump is available.") + } + return safeThreadDump --- End diff -- you don't need `return` here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20474: [SPARK-23235][Core] Add executor Threaddump to ap...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20474#discussion_r167917733 --- Diff: docs/monitoring.md --- @@ -347,6 +347,10 @@ can be identified by their `[attempt-id]`. In the API listed below, when running /applications/[app-id]/executors A list of all active executors for the given application. + + /applications/[app-id]/executors/[executor-id]/threads +Stack traces of all the threads running within the given active executor. --- End diff -- Add that this is not available via the history server. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20244: [SPARK-23053][CORE] taskBinarySerialization and task par...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20244 merged to master / 2.3 / 2.2 I hit a merge conflict trying to merge to 2.1 -- feel free to open another PR for that version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20408: [SPARK-23189][Core][Web UI] Reflect stage level blacklis...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20408 merged to master, thanks everyone --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20532: [SPARK-23353][CORE] Allow ExecutorMetricsUpdate events t...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20532 I can see why you want this sometimes, but I'm trying to figure out if its really valuable for users in general. You could always add a custom listener to log this info. It would go into separate file, not the std event log file, which means you'd have a little more work to do to stitch them together. OTOH that could be a good thing, as it means these history server wouldn't have to parse those extra lines. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20244#discussion_r167138603 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * In this test, we simulate the scene in concurrent jobs using the same + * rdd which is marked to do checkpoint: + * Job one has already finished the spark job, and start the process of doCheckpoint; + * Job two is submitted, and submitMissingTasks is called. + * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done, + * while part calculates from stage.rdd.partitions is called after doCheckpoint is done, + * we may get a ClassCastException when execute the task because of some rdd will do + * Partition cast. + * + * With this test case, just want to indicate that we should do taskSerialization and + * part calculate in submitMissingTasks with the same rdd checkpoint status. + */ + test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") { --- End diff -- hi @ivoson -- I haven't come up with a better way to test this, so I think for now you should (1) change the PR to *only* include the changes to the DAGScheduler (also undo the `protected[spark]` changes elsewhere) (2) put this repro on the jira as its a pretty good for showing whats going on. if we come up with a way to test it, we can always do that later on. thanks and sorry for the back and forth --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20408: [SPARK-23189][Core][Web UI] Reflect stage level blacklis...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20408 just a quick note -- I realized I was confused about one part of the inner workings of the history server which I want to confirm before I merge this, but got sick and now have a bit of a backlog. Will get back to this next week. Sorry for the delay --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20244#discussion_r166458357 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * In this test, we simulate the scene in concurrent jobs using the same + * rdd which is marked to do checkpoint: + * Job one has already finished the spark job, and start the process of doCheckpoint; + * Job two is submitted, and submitMissingTasks is called. + * In submitMissingTasks, if taskSerialization is called before doCheckpoint is done, + * while part calculates from stage.rdd.partitions is called after doCheckpoint is done, + * we may get a ClassCastException when execute the task because of some rdd will do + * Partition cast. + * + * With this test case, just want to indicate that we should do taskSerialization and + * part calculate in submitMissingTasks with the same rdd checkpoint status. + */ + test("SPARK-23053: avoid ClassCastException in concurrent execution with checkpoint") { --- End diff -- hi @ivoson -- I'm really sorry but I only just realized that this "test" is really just a repro, and it passes both before and after the actual code changes, since you've replicated the internal logic we're fixing. As such, I don't think its actually useful as a test case -- perhaps it should get added to the jira as a repro. I appreciate the work that went into writing this as it helped make the issue clear to me. I am not sure if there is a good way to test this. If we can't come up with anything, we should just commit your actual fix, but give me a day or two to think about it ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20451: [SPARK-23146][WIP] Support client mode for Kubern...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20451#discussion_r166087229 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +private[spark] object OptionRequirements { + + def requireBothOrNeitherDefined( --- End diff -- not used, and if it were, perhaps would be clearer as ```scala (opt1, opt2) match { case (Some(val1), None) => ... case (None, Some(val2)) => ... case _ => // ok } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20493: [SPARK-23326][WEBUI]schedulerDelay should return ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20493#discussion_r166068708 --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.Date + +import org.apache.spark.SparkFunSuite +import org.apache.spark.status.api.v1.{TaskData, TaskMetrics} + +class AppStatusUtilsSuite extends SparkFunSuite { + + test("schedulerDelay") { +val runningTask = new TaskData( --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17422: [SPARK-20087] Attach accumulators / metrics to 'T...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17422#discussion_r165772194 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -429,15 +429,42 @@ private[spark] class Executor( case t: TaskKilledException => logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") + + // Collect latest accumulator values to report back to the driver + val accums: Seq[AccumulatorV2[_, _]] = +if (task != null) { + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + task.collectAccumulatorUpdates(taskFailed = true) +} else { + Seq.empty +} + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) --- End diff -- this should be refactored, and not repeated 3 times. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17422: [SPARK-20087] Attach accumulators / metrics to 'T...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17422#discussion_r165772055 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,19 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { - override def toErrorString: String = s"TaskKilled ($reason)" +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil) + extends TaskFailedReason { + + override def toErrorString: String = "TaskKilled ($reason)" override def countTowardsTaskFailures: Boolean = false + + private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): TaskKilled = { --- End diff -- I don't think this method is really necessary at all, you could just pass it in the constructor in the places its used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17422: [SPARK-20087] Attach accumulators / metrics to 'TaskKill...
Github user squito commented on the issue: https://github.com/apache/spark/pull/17422 @advancedxy this has been quiet for a long time, so I suggest you just take it over. I actually think this is so close to complete that very little would need to be done, and credit would most likely go to @noodle-fb . That said, we may need a little more input on whether or not this is desirable, as it will change the meaning of the aggregated metrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org