[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20807


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-16 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r175156772
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -418,7 +418,19 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   }
 
   private def sparkContextInitialized(sc: SparkContext) = {
+// Notify runDriver function that SparkContext is available
 sparkContextPromise.success(sc)
+// Pause the user class thread in order to make proper initialization 
in runDriver function.
+// When it happened the thread has to be resumed with resumeDriver 
function.
+sparkContextPromise.synchronized {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-16 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r175156659
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 // if the user app did not create a SparkContext.
 throw new IllegalStateException("User did not initialize spark 
context!")
   }
+  // After initialisation notify user class thread to continue
--- End diff --

Moved.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r175151620
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 // if the user app did not create a SparkContext.
 throw new IllegalStateException("User did not initialize spark 
context!")
   }
+  // After initialisation notify user class thread to continue
--- End diff --

You should remove this comment and add one to the `resumeDriver` method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r175151438
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -418,7 +418,19 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   }
 
   private def sparkContextInitialized(sc: SparkContext) = {
+// Notify runDriver function that SparkContext is available
 sparkContextPromise.success(sc)
+// Pause the user class thread in order to make proper initialization 
in runDriver function.
+// When it happened the thread has to be resumed with resumeDriver 
function.
+sparkContextPromise.synchronized {
--- End diff --

Hmm, there's a race now.

You're updating the promise outside the lock, so it's possible that the 
`runDriver` thread can see that and notify the lock before this thread grabs 
it, so this thread would hang forever in that case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r174983468
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 // if the user app did not create a SparkContext.
 throw new IllegalStateException("User did not initialize spark 
context!")
   }
+  // After initialisation notify user class thread to continue
+  synchronized { notify() }
--- End diff --

Moved into `resumeDriver` function right below `sparkContextInitialized`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r174983449
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -417,8 +417,11 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 }
   }
 
-  private def sparkContextInitialized(sc: SparkContext) = {
+  private def sparkContextInitialized(sc: SparkContext) = synchronized {
--- End diff --

Used `sparkContextPromise` as lock.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r174983476
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 // if the user app did not create a SparkContext.
 throw new IllegalStateException("User did not initialize spark 
context!")
   }
+  // After initialisation notify user class thread to continue
--- End diff --

Fixed and switched to US spell checker.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r174964951
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -417,8 +417,11 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 }
   }
 
-  private def sparkContextInitialized(sc: SparkContext) = {
+  private def sparkContextInitialized(sc: SparkContext) = synchronized {
--- End diff --

Some other code in this class uses synchronization on `this`, so I think it 
would be better to synchronize on `sparkContextPromise` in this case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r174965130
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 // if the user app did not create a SparkContext.
 throw new IllegalStateException("User did not initialize spark 
context!")
   }
+  // After initialisation notify user class thread to continue
+  synchronized { notify() }
--- End diff --

Since you have to do this in two places, I'd create a method (e.g. 
`resumeDriver`) close to where `sparkContextInitialized` is declared, so that 
it's easier to find the context of why this is needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r174965182
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 // if the user app did not create a SparkContext.
 throw new IllegalStateException("User did not initialize spark 
context!")
   }
+  // After initialisation notify user class thread to continue
--- End diff --

nit: rest of the code uses American spelling ("initialization"), so this 
should be consistent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-13 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r174045577
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -496,7 +497,7 @@ private[yarn] class YarnAllocator(
   executorIdCounter += 1
   val executorHostname = container.getNodeId.getHost
   val containerId = container.getId
-  val executorId = executorIdCounter.toString
+  val executorId = (initialExecutorIdCounter + 
executorIdCounter).toString
--- End diff --

Yeah, this is an issue only when the application is quite fast. Do you have 
concerns in general solving this or related the fix in the first commit? Asking 
it because pausing the user class thread would be definitely better as I've 
written below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-13 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r174035432
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -496,7 +497,7 @@ private[yarn] class YarnAllocator(
   executorIdCounter += 1
   val executorHostname = container.getNodeId.getHost
   val containerId = container.getId
-  val executorId = executorIdCounter.toString
+  val executorId = (initialExecutorIdCounter + 
executorIdCounter).toString
--- End diff --

I get the point of fix. But also it seems a little strange to me. 

Besides, do we really need to fix your issue? As I know the case here is 
not a normal one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-13 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r174032329
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -496,7 +497,7 @@ private[yarn] class YarnAllocator(
   executorIdCounter += 1
   val executorHostname = container.getNodeId.getHost
   val containerId = container.getId
-  val executorId = executorIdCounter.toString
+  val executorId = (initialExecutorIdCounter + 
executorIdCounter).toString
--- End diff --

The initial problem was that initialExecutorIdCounter is coming from the 
driver which is already stopped. Making this lazy solved this. The other 
integer is necessary because make it `lazy var` is not possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-13 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/20807#discussion_r174027869
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 ---
@@ -496,7 +497,7 @@ private[yarn] class YarnAllocator(
   executorIdCounter += 1
   val executorHostname = container.getNodeId.getHost
   val containerId = container.getId
-  val executorId = executorIdCounter.toString
+  val executorId = (initialExecutorIdCounter + 
executorIdCounter).toString
--- End diff --

it seems a bit strange to me to "add" the Ids?
@vanzin @jerryshao 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...

2018-03-12 Thread gaborgsomogyi
GitHub user gaborgsomogyi opened a pull request:

https://github.com/apache/spark/pull/20807

SPARK-23660: Fix exception in yarn cluster mode when application ended fast

## What changes were proposed in this pull request?

Yarn throws the following exception in cluster mode when the application is 
really small:

```
18/03/07 23:34:22 WARN netty.NettyRpcEnv: Ignored failure: 
java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@7c974942 
rejected from 
java.util.concurrent.ScheduledThreadPoolExecutor@1eea9d2d[Terminated, pool size 
= 0, active threads = 0, queued tasks = 0, completed tasks = 0]
18/03/07 23:34:22 ERROR yarn.ApplicationMaster: Uncaught exception: 
org.apache.spark.SparkException: Exception thrown in awaitResult: 
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
at 
org.apache.spark.deploy.yarn.YarnAllocator.(YarnAllocator.scala:102)
at 
org.apache.spark.deploy.yarn.YarnRMClient.register(YarnRMClient.scala:77)
at 
org.apache.spark.deploy.yarn.ApplicationMaster.registerAM(ApplicationMaster.scala:450)
at 
org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:493)
at 
org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:345)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$5.run(ApplicationMaster.scala:810)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at 
org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:809)
at 
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:834)
at 
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already 
stopped.
at 
org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:158)
at 
org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135)
at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229)
at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
... 17 more
18/03/07 23:34:22 INFO yarn.ApplicationMaster: Final app status: FAILED, 
exitCode: 13, (reason: Uncaught exception: org.apache.spark.SparkException: 
Exception thrown in awaitResult: )
```

Example application:

```
object ExampleApp {
  def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ExampleApp")
val sc = new SparkContext(conf)
try {
  // Do nothing
} finally {
  sc.stop()
}
  }
```

This PR makes `initialExecutorIdCounter ` lazy. This way `YarnAllocator` 
can be instantiated even if the driver already ended.

## How was this patch tested?

Automated: Additional unit test added
Manual: Application submitted into small cluster


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborgsomogyi/spark SPARK-23660

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20807.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20807


commit 114ac05102c9d563c922447423ec8445bb37e9ef
Author: Gabor Somogyi 
Date:   2018-03-13T04:23:59Z

SPARK-23660: Fix exception in yarn cluster mode when application ended fast




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org