[GitHub] spark pull request #18093: [WIP][SPARK-20774][SQL] Cancel all jobs when Quer...
Github user liyichao closed the pull request at: https://github.com/apache/spark/pull/18093 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18070: [SPARK-20713][Spark Core] Convert CommitDenied to TaskKi...
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18070 Oh, I did not notice that, since @nlyu follows up, I will close this pr now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18070: [SPARK-20713][Spark Core] Convert CommitDenied to...
Github user liyichao closed the pull request at: https://github.com/apache/spark/pull/18070 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18070: [SPARK-20713][Spark Core] Convert CommitDenied to TaskKi...
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18070 I will update the pr in a day. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18093: [WIP][SPARK-20774][SQL] Cancel all jobs when QueryExecti...
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18093 Sorry about that, I will test it when I have time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...
Github user liyichao commented on a diff in the pull request: https://github.com/apache/spark/pull/18092#discussion_r122621671 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1281,6 +1286,61 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("item").isEmpty) } + test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") { +val tryAgainMsg = "test_spark_20640_try_again" +// a server which delays response 50ms and must try twice for success. +def newShuffleServer(port: Int): (TransportServer, Int) = { + val attempts = new mutable.HashMap[String, Int]() + val handler = new NoOpRpcHandler { +override def receive( + client: TransportClient, --- End diff -- Oh. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...
Github user liyichao commented on a diff in the pull request: https://github.com/apache/spark/pull/18092#discussion_r122620600 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1281,6 +1286,59 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("item").isEmpty) } + test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") { +val tryAgainMsg = "test_spark_20640_try_again" +// a server which delays response 50ms and must try twice for success. +def newShuffleServer(port: Int): (TransportServer, Int) = { + val attempts = new mutable.HashMap[String, Int]() + val handler = new NoOpRpcHandler { +override def receive(client: TransportClient, message: ByteBuffer, --- End diff -- Updated. By the way, I am a little confused. First, when you insert line break before, Intellij auto indent like this: ``` override def receive( client: TransportClient ``` Second, in the same file, at near 1349, `fetchBlocks`'s indent is like this: ``` override def fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener, shuffleFiles: Array[File]): Unit = { ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...
Github user liyichao commented on a diff in the pull request: https://github.com/apache/spark/pull/18092#discussion_r122620196 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1281,6 +1286,59 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("item").isEmpty) } + test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") { +val tryAgainMsg = "test_spark_20640_try_again" +// a server which delays response 50ms and must try twice for success. +def newShuffleServer(port: Int): (TransportServer, Int) = { + val attempts = new mutable.HashMap[String, Int]() + val handler = new NoOpRpcHandler { +override def receive(client: TransportClient, message: ByteBuffer, + callback: RpcResponseCallback): Unit = { + val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message) + msgObj match { +case exec: RegisterExecutor => + Thread.sleep(50) + val attempt = attempts.getOrElse(exec.execId, 0) + 1 + attempts(exec.execId) = attempt + if (attempt < 2) { +callback.onFailure(new Exception(tryAgainMsg)) +return + } + callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0))) + } +} + } + + val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0) + val transCtx = new TransportContext(transConf, handler, true) + (transCtx.createServer(port, Seq.empty[TransportServerBootstrap].asJava), port) +} +val candidatePort = RandomUtils.nextInt(1024, 65536) +val (server, shufflePort) = Utils.startServiceOnPort(candidatePort, --- End diff -- No, because `startServiceOnPort` will handle the conflicted port case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18092: [SPARK-20640][CORE]Make rpc timeout and retry for shuffl...
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18092 ping @jiangxb1987 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18144: [SPARK-20912][SQL] Allow column name in map funct...
Github user liyichao closed the pull request at: https://github.com/apache/spark/pull/18144 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18144: [SPARK-20912][SQL] Allow column name in map functions.
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18144 As the idea is not that good, this is closed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18084: [SPARK-19900][core]Remove driver when relaunching...
Github user liyichao commented on a diff in the pull request: https://github.com/apache/spark/pull/18084#discussion_r121902338 --- Diff: core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala --- @@ -588,6 +633,70 @@ class MasterSuite extends SparkFunSuite } } + test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") { +val conf = new SparkConf().set("spark.worker.timeout", "1") +val master = makeMaster(conf) +master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) +eventually(timeout(10.seconds)) { + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) --- End diff -- Hi, this can not be moved because `MasterStateResponse` is changed over time. If we move the rpc out, the masterState will never change, and the assert will fail. See the above test `SPARK-20529:...`, there is a same eventually assert. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18084: [SPARK-19900][core]Remove driver when relaunching.
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18084 OK, another scenario: * driver with driverId1 started on worker1 * worker1 lost * master add driverId1 to waitingDrivers * worker1 reconnects and sends DriverStateChanged(driverId1), but the message delayed in the network, **and remove driverId1 from local state** * master starts driverId1 on worker1. * master receives the message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18084: [SPARK-19900][core]Remove driver when relaunching.
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18084 Hi, add a workerId may not work. For example, this scenario: * driver with driverId1 started on worker1 * worker1 lost * master add driverId1 to waitingDrivers * worker1 reconnects and sends DriverStateChanged(driverId1), but the message delayed in the network. * master starts driverId1 on worker1. * master receives the message. Now, what master should do? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18092: [SPARK-20640][CORE]Make rpc timeout and retry for shuffl...
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18092 @JoshRosen Could you please see the failed test? It seems unrelated to this pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...
Github user liyichao commented on a diff in the pull request: https://github.com/apache/spark/pull/18092#discussion_r121281612 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1281,6 +1285,57 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("item").isEmpty) } + test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") { +val shufflePort = 1 +val tryAgainMsg = "test_spark_20640_try_again" +conf.set("spark.shuffle.service.enabled", "true") +conf.set("spark.shuffle.service.port", shufflePort.toString) +// a server which delays response 50ms and must try twice for success. +def newShuffleServer(): TransportServer = { + val attempts = new mutable.HashMap[String, Int]() + val handler = new NoOpRpcHandler { +override def receive(client: TransportClient, message: ByteBuffer, + callback: RpcResponseCallback): Unit = { + val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message) + msgObj match { +case exec: RegisterExecutor => + Thread.sleep(50) + val attempt = attempts.getOrElse(exec.execId, 0) + 1 + attempts(exec.execId) = attempt + if (attempt < 2) { +callback.onFailure(new Exception(tryAgainMsg)) +return + } + callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0))) + } +} + } + + val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0) + val transCtx = new TransportContext(transConf, handler, true) + transCtx.createServer(shufflePort, Nil.asInstanceOf[Seq[TransportServerBootstrap]].asJava) +} +newShuffleServer() + +conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40") +conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1") +var e = intercept[SparkException]{ + makeBlockManager(8000, "executor1") +}.getMessage +assert(e.contains("TimeoutException")) + +conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000") +conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1") +e = intercept[SparkException]{ --- End diff -- Hi, what's your suggestion? When attempt < 2, we already return an error `tryAgainMsg`. The request must fail if specified time is not passed, and succeed otherwise, there seems to be no other choice besides `sleep`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18070: [SPARK-20713][Spark Core] Convert CommitDenied to TaskKi...
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18070 How about Letting TaskCommitDenied and TaskKilled extend a same trait (for example, TaskKilledReason)? This way when accounting metrics, TaskCommitDenied and TaskKilled are all contributing to taskKilled and not TaskFailed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18084: [SPARK-19900][core]Remove driver when relaunching.
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18084 ping @jiangxb1987 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18070: [SPARK-20713][Spark Core] Convert CommitDenied to TaskKi...
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18070 ping @tgravescs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18092: [SPARK-20640][CORE]Make rpc timeout and retry for shuffl...
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18092 ping @JoshRosen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18129: [SPARK-20365][YARN] Remove LocalSchem when add pa...
Github user liyichao commented on a diff in the pull request: https://github.com/apache/spark/pull/18129#discussion_r119309080 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -116,15 +116,16 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val cp = env("CLASSPATH").split(":|;|") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => val uri = new URI(entry) - if (LOCAL_SCHEME.equals(uri.getScheme())) { -cp should contain (uri.getPath()) + if (LOCAL_SCHEME.equals(uri.getScheme)) { +cp should contain (uri.getPath) --- End diff -- It is Intellij that advises the above changes. I will revert them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18144: [SPARK-20912][SQL] Allow column name in map funct...
GitHub user liyichao opened a pull request: https://github.com/apache/spark/pull/18144 [SPARK-20912][SQL] Allow column name in map functions. ## What changes were proposed in this pull request? `map` function only accepts Column values only. It'd be very helpful to have a variant that accepts String for columns just like what `array` or `struct`. ## How was this patch tested? Added a test in `DataFrameFunctionsSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/liyichao/spark SPARK-20912 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18144.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18144 commit 64a98306c6e1e6e9f79a976044c6895779afb24c Author: Li Yichao <l...@zhihu.com> Date: 2017-05-30T14:35:28Z Allow column name in map functions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18129: Remove LocalSchem when add path to ClassPath.
GitHub user liyichao opened a pull request: https://github.com/apache/spark/pull/18129 Remove LocalSchem when add path to ClassPath. ## What changes were proposed in this pull request? In Spark on YARN, when configuring "spark.yarn.jars" with local jars (jars started with "local" scheme), we will get inaccurate classpath for AM and containers. This is because we don't remove "local" scheme when concatenating classpath. It is OK to run because classpath is separated with ":" and java treat "local" as a separate jar. But we could improve it to remove the scheme. ## How was this patch tested? no. cc @jerryshao You can merge this pull request into a Git repository by running: $ git pull https://github.com/liyichao/spark SPARK-20365 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18129.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18129 commit 811f28b73df4a6ead1ea15872da3138c0996ec15 Author: Li Yichao <l...@zhihu.com> Date: 2017-05-27T07:49:48Z Remove LocalSchem when add path to ClassPath. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18084: [SPARK-19900][core]Remove driver when relaunching.
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18084 Hi, I've thought more thoroughly about this. The main state involved here is Master.workers, Master.idToWorker, and WorkerInfo.drivers. Say `driverId1` runs on Worker A. Assume A is network partitioned, master calls removeWorker which set the worker's state to DEAD, and remove the worker from persistenceEngine, but does not remove it from Master.workers. Then launch the driver on Worker B. When A reconnects, it will reregister to master, then master will remove the old WorkerInfo (whose `drivers` field is not empty), and add a new WorkerInfo (say `wf_A`), whose drivers are empty. After registered, the worker then re-sync state with master by sending `WorkerLatestState` with a `driverId1`, the master does not find it in `wf_A.drivers`, so it asks worker A to kill it. After killed the driver, worker A sends `DriverStateChanged(driverId1, DriverState.KILLED)`, the master then mistakenly removes `driverId1`, which now runs on worker B. How to recognize the `DriverStateChanged` come from worker A, not worker B? Maybe we can add a field `workerId` to `DriverStateChanged`, but is it possible the second run of `driverId1` is on worker A? consider the following scenario: 1. worker A network partitioned 2. master put `driverId1` to waitingDrivers 3. worker A reconnects and register 4. master launch `driverId1` on worker A 5. worker A's `WorkerLatestState(_,_,Seq(driverId1))` arrives at master Now, how does worker A handle the `LaunchDriver(driverId1)` when it has already running a driver with `driverId1`? how does the master process `WorkerLatestState`? With the above message order, master will send `KillDriver` to worker A, then worker will kill `driverId1`, which is the relaunched one, then send `DriverStateChanged` to master, master will relaunch it... After all this, I think it better to relaunch the driver with a new id to make it simple. As to the cost, `removeDriver` will be called anyway, if not here, it will be called when `DriverStateChanged` come. `persistenceEngine` have to be called because the persistent state `driver.id` changed. So the cost is justified. And `relaunchDriver` is called when worker down or master down, it seems rarely because framework code is more stable than application code, so software bugs are less likely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18084: [SPARK-19900][core]Remove driver when relaunching.
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18084 Thanks for the reply. I have add some more tests to verify the state of master and worker after relaunching. I will try think about if there are ways to reuse the old driver struct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18092: [SPARK-20640][CORE]Make rpc timeout and retry for...
Github user liyichao commented on a diff in the pull request: https://github.com/apache/spark/pull/18092#discussion_r118671187 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -170,11 +170,17 @@ private[spark] class BlockManager( // service, or just our own Executor's BlockManager. private[spark] var shuffleServerId: BlockManagerId = _ + private val registrationTimeout = +conf.getTimeAsMs("spark.shuffle.registration.timeout", "5s") --- End diff -- Updated and add unit test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18092: [SPARK-20640][CORE]Make rpc timeout and retry for shuffl...
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/18092 Sorry, I thought it not necessary to duplicate message in JIRA, thanks for the suggestion. PR is updated. As to the test plan, the modification seems straightforward, and I can not think of meaningful test cases, are there any suggestions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18084: [SPARK-19900][core]Remove driver when relaunching...
Github user liyichao commented on a diff in the pull request: https://github.com/apache/spark/pull/18084#discussion_r118436939 --- Diff: core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala --- @@ -499,4 +500,103 @@ class MasterSuite extends SparkFunSuite assert(receivedMasterAddress === RpcAddress("localhost2", 1)) } } + + test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") { +val conf = new SparkConf().set("spark.worker.timeout", "1") +val master = makeMaster(conf) +master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) +eventually(timeout(10.seconds)) { + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) + assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") +} + +val app = DeployTestUtils.createAppDesc() +var appId = "" +val driverEnv1 = RpcEnv.create("driver1", "localhost", 22344, conf, new SecurityManager(conf)) +val fakeDriver1 = driverEnv1.setupEndpoint("driver", new RpcEndpoint { + override val rpcEnv: RpcEnv = driverEnv1 + override def receive: PartialFunction[Any, Unit] = { +case RegisteredApplication(id, _) => appId = id + } +}) +val drivers = new HashMap[String, String] +val workerEnv1 = RpcEnv.create("worker1", "localhost", 12344, conf, new SecurityManager(conf)) +val fakeWorker1 = workerEnv1.setupEndpoint("worker", new RpcEndpoint { + override val rpcEnv: RpcEnv = workerEnv1 + override def receive: PartialFunction[Any, Unit] = { +case RegisteredWorker(masterRef, _, _) => + masterRef.send(WorkerLatestState("1", Nil, drivers.keys.toSeq)) +case LaunchDriver(id, desc) => + drivers(id) = id + master.self.send(RegisterApplication(app, fakeDriver1)) +case KillDriver(driverId) => + master.self.send(DriverStateChanged(driverId, DriverState.KILLED, None)) + drivers.remove(driverId) + } +}) +val worker1 = RegisterWorker( + "1", + "localhost", + , + fakeWorker1, + 10, + 1024, + "http://localhost:8080;, + RpcAddress("localhost2", 1)) +master.self.send(worker1) +val driver = DeployTestUtils.createDriverDesc().copy(supervise = true) +master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)) + +eventually(timeout(10.seconds)) { + assert(!appId.isEmpty) +} + +eventually(timeout(10.seconds)) { + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) + assert(masterState.workers(0).state == WorkerState.DEAD) +} + +val driverEnv2 = RpcEnv.create("driver2", "localhost", 22345, conf, new SecurityManager(conf)) +val fakeDriver2 = driverEnv2.setupEndpoint("driver", new RpcEndpoint { --- End diff -- updated, please have a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18084: [SPARK-19900][core]Remove driver when relaunching...
Github user liyichao commented on a diff in the pull request: https://github.com/apache/spark/pull/18084#discussion_r118424700 --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala --- @@ -796,9 +796,12 @@ private[deploy] class Master( } private def relaunchDriver(driver: DriverInfo) { -driver.worker = None -driver.state = DriverState.RELAUNCHING -waitingDrivers += driver +removeDriver(driver.id, DriverState.RELAUNCHING, None) +val newDriver = createDriver(driver.desc) --- End diff -- First, we must distinguish the original driver and the newly relaunched one, because there will be statusUpdate of the two versions to arrive at master. For example, when the network partitioned worker reconnects to master, it will send `DriverStateChanged` with the driver id, and master must recognize it is the state of the original driver and the newly launched driver. The patch simply choose a new driver id to do this, which also has some Shortcomings, however. For example, In the UI, the two versions of driver are not related, and the final state is `RELAUNCHING`(which seems better to be relaunched). Another way is to add some like `attemptId` to driver state, and then Let `DriverStateChanged` bring the attemptId to indicate its entity. This seems more complex. What's your opinion? It seems hard something like `attemptId` to the persistent driver state? Looking forward to your opinions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18093: [SPARK-20774][SQL] Cancel all jobs when QueryExec...
GitHub user liyichao opened a pull request: https://github.com/apache/spark/pull/18093 [SPARK-20774][SQL] Cancel all jobs when QueryExection throws. see https://issues.apache.org/jira/browse/SPARK-20774?filter=12340455 ## What changes were proposed in this pull request? When the sql query failed, all related jobs are canceled to reap resources. ## How was this patch tested? no. cc @zsxwing You can merge this pull request into a Git repository by running: $ git pull https://github.com/liyichao/spark SPARK-20774 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18093.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18093 commit 32e5c26e6861a4f14bcea552e68854a8b452ada7 Author: Li Yichao <l...@zhihu.com> Date: 2017-05-22T08:59:46Z Cancel all jobs when QueryExection throws. see https://issues.apache.org/jira/browse/SPARK-20774?filter=12340455 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18092: Make rpc timeout and retry for shuffle registrati...
GitHub user liyichao opened a pull request: https://github.com/apache/spark/pull/18092 Make rpc timeout and retry for shuffle registration configurable. ## What changes were proposed in this pull request? As title said ## How was this patch tested? no cc @sitalkedia You can merge this pull request into a Git repository by running: $ git pull https://github.com/liyichao/spark SPARK-20640 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18092.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18092 commit 80e9ad9e02fbfd24bbd6d97e03b1bdf01e4c922c Author: Li Yichao <l...@zhihu.com> Date: 2017-05-24T17:42:43Z Make rpc timeout and retry for shuffle registration configurable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18084: [SPARK-19900][core]Remove driver when relaunching...
GitHub user liyichao opened a pull request: https://github.com/apache/spark/pull/18084 [SPARK-19900][core]Remove driver when relaunching. This is https://github.com/apache/spark/pull/17888 . cc @cloud-fan @jiangxb1987 You can merge this pull request into a Git repository by running: $ git pull https://github.com/liyichao/spark SPARK-19900-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18084.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18084 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17888: [SPARK-19900][core]Remove driver when relaunching.
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/17888 Sorry, it seems I make a mistake when rebase. I will open another pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17888: [SPARK-19900][core]Remove driver when relaunching...
Github user liyichao closed the pull request at: https://github.com/apache/spark/pull/17888 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17888: [SPARK-19900][core]Remove driver when relaunching.
Github user liyichao commented on the issue: https://github.com/apache/spark/pull/17888 Thanks for reviewing. Basically, the problem is that when relaunching a driver and later the original driver reconnect, there will be an application which does not have a corresponding driver. I will try to add a test case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18070: [SPARK-20713][Spark Core] Convert CommitDenied to...
Github user liyichao commented on a diff in the pull request: https://github.com/apache/spark/pull/18070#discussion_r118050668 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -338,6 +340,9 @@ private[spark] class Executor( metricsSystem = env.metricsSystem) threwException = false res +} catch { + case _: CommitDeniedException => +throw new TaskKilledException("commit denied") --- End diff -- Maybe we should throw a more specific exception for the `already committed` case ? Executor can know about the `already committed` case before it sends the statusUpdate, so we do not need to wait until the driver's statusUpdate. It also seems ok to fix it up on the driver, I'll try it tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18070: Convert CommitDenied to TaskKilled.
GitHub user liyichao opened a pull request: https://github.com/apache/spark/pull/18070 Convert CommitDenied to TaskKilled. ## What changes were proposed in this pull request? In executor, `CommitDeniedException` is converted to `TaskKilledException` to avoid the inconsistency of taskState because there exists a race between when the driver kills and when the executor tries to commit. ## How was this patch tested? No tests because it is straightforward. You can merge this pull request into a Git repository by running: $ git pull https://github.com/liyichao/spark SPARK-20713 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18070.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18070 commit 3467d5d2c8dc60654575d4aecd2771a74a0f3fea Author: Li Yichao <l...@zhihu.com> Date: 2017-05-23T09:39:17Z Convert CommitDenied to TaskKilled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17888: [SPARK-19900][core]Remove driver when relaunching...
GitHub user liyichao opened a pull request: https://github.com/apache/spark/pull/17888 [SPARK-19900][core]Remove driver when relaunching. ## What changes were proposed in this pull request? * remove failed apps when worker down * do not reuse driver id when relaunching driver ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/liyichao/spark SPARK-19900 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17888.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17888 commit 2992b00a7f200545cc5474a591659735b61866fb Author: Li Yichao <l...@zhihu.com> Date: 2017-05-07T08:36:58Z Remove driver when relaunching. commit 44baeb3b94b6ecf3ed9b60e18f4e5dde0bdf06dd Author: Li Yichao <l...@zhihu.com> Date: 2017-05-07T09:34:44Z Add some. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org