[GitHub] spark pull request: [Yarn][minor]Fix: avoid printing InterruptedEx...

2015-04-13 Thread li-zhihui
Github user li-zhihui closed the pull request at:

https://github.com/apache/spark/pull/5451


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Yarn][minor]Fix: avoid printing InterruptedEx...

2015-04-10 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/5451#issuecomment-91730112
  
@srowen @vanzin Thanks for your comments.
I add a new commit as your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Yarn][minor]Fix: avoid printing InterruptedEx...

2015-04-10 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/5451#issuecomment-91732744
  
@srowen I guess the exception maybe make user feel puzzled because it's 
unexpected, although it doesn't effect application execution. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Fix string interpolator error in HeartbeatRece...

2015-03-29 Thread li-zhihui
GitHub user li-zhihui opened a pull request:

https://github.com/apache/spark/pull/5255

Fix string interpolator error in HeartbeatReceiver

Error log before fixed
code15/03/29 10:07:25 ERROR YarnScheduler: Lost an executor 24 (already 
removed): Executor heartbeat timed out after ${now - lastSeenMs} ms/code

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/li-zhihui/spark fixstringinterpolator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/5255.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5255


commit c93f2b7c88c0a704c2bee978f8c8f57e18ed04e8
Author: Li Zhihui zhihui...@intel.com
Date:   2015-03-30T01:27:16Z

Fix string interpolator error in HeartbeatReceiver




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2015-02-26 Thread li-zhihui
Github user li-zhihui closed the pull request at:

https://github.com/apache/spark/pull/1462


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2015-02-25 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1462#discussion_r25405036
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
 ---
@@ -62,6 +62,11 @@ private[spark] class MesosSchedulerBackend(
 
   var classLoader: ClassLoader = null
 
+  if 
(!sc.getConf.getOption(spark.scheduler.minRegisteredResourcesRatio).isEmpty) {
--- End diff --

Done, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2015-02-25 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1462#discussion_r25405040
  
--- Diff: docs/configuration.md ---
@@ -831,7 +831,7 @@ Apart from these, the following properties are also 
available, and may be useful
   td0/td
   td
 The minimum ratio of registered resources (registered resources / 
total expected resources)
-(resources are executors in yarn mode, CPU cores in standalone mode)
+(resources are executors in yarn mode, CPU cores in standalone mode 
and coarse mesos mode)
--- End diff --

Done, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2015-02-25 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1462#issuecomment-76123507
  
Add some new commits to fix code conflict and some issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Modify default value description for spark.sch...

2015-02-25 Thread li-zhihui
GitHub user li-zhihui opened a pull request:

https://github.com/apache/spark/pull/4781

Modify default value description for 
spark.scheduler.minRegisteredResourcesRatio on docs.

Because the configuration is not supported in mesos mode now.
See https://github.com/apache/spark/pull/1462


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/li-zhihui/spark fixdocconf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/4781.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4781


commit 63e7a44b35c96373458bb659260b755380519e10
Author: Li Zhihui zhihui...@intel.com
Date:   2015-02-26T05:18:09Z

Modify default value description for 
spark.scheduler.minRegisteredResourcesRatio on docs.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2015-02-11 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1462#issuecomment-74014949
  
@pwendell Do we need the feature in mesos mode? I am pleasure to update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-10-23 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r19321179
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -322,14 +322,14 @@ private[spark] class Executor(
   // Fetch missing dependencies
   for ((name, timestamp) - newFiles if currentFiles.getOrElse(name, 
-1L)  timestamp) {
 logInfo(Fetching  + name +  with timestamp  + timestamp)
-Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, 
env.securityManager,
-  hadoopConf)
+Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+  env.securityManager, hadoopConf, timestamp, useCache = true)
 currentFiles(name) = timestamp
   }
   for ((name, timestamp) - newJars if currentJars.getOrElse(name, 
-1L)  timestamp) {
 logInfo(Fetching  + name +  with timestamp  + timestamp)
-Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, 
env.securityManager,
-  hadoopConf)
+Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+  env.securityManager, hadoopConf, timestamp, useCache = true)
--- End diff --

thanks @andrewor14 , done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-10-23 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-60346005
  
@andrewor14 I guess the failure is non-interrelated with the patch. But I 
don't know why failed again, can you give me some advise? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-10-21 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-60025907
  
@andrewor14 more comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-10-07 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r18564683
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -313,15 +313,84 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Download a file requested by the executor. Supports fetching the file 
in a variety of ways,
+   * Download a file to target directory. Supports fetching the file in a 
variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the 
URL parameter.
*
+   * If `useCache` is true, first attempts to fetch the file to a local 
cache that's shared 
+   * across executors running the same application. `useCache` is used 
mainly for 
+   * the the executors, not in local mode.
--- End diff --

Done, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-10-07 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r18564686
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -313,15 +313,84 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Download a file requested by the executor. Supports fetching the file 
in a variety of ways,
+   * Download a file to target directory. Supports fetching the file in a 
variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the 
URL parameter.
*
+   * If `useCache` is true, first attempts to fetch the file to a local 
cache that's shared 
+   * across executors running the same application. `useCache` is used 
mainly for 
+   * the the executors, not in local mode.
+   *
* Throws SparkException if the target file already exists and has 
different contents than
* the requested file.
*/
-  def fetchFile(url: String, targetDir: File, conf: SparkConf, 
securityMgr: SecurityManager,
-hadoopConf: Configuration) {
-val filename = url.split(/).last
+  def fetchFile(
+  url: String,
+  targetDir: File,
+  conf: SparkConf,
+  securityMgr: SecurityManager,
+  hadoopConf: Configuration,
+  timestamp: Long,
+  useCache: Boolean) {
+val fileName = url.split(/).last
+val targetFile = new File(targetDir, fileName)
+if (useCache) {
+  val cachedFileName = s${url.hashCode}${timestamp}_cache
+  val lockFileName = s${url.hashCode}${timestamp}_lock
+  val localDir = new File(getLocalDir(conf))
+  val lockFile = new File(localDir, lockFileName)
+  val raf = new RandomAccessFile(lockFile, rw)
+  // Only one executor entry.
+  // The FileLock is only used to control synchronization for 
executors download file,
+  // it's always safe regardless of lock type(mandatory or advisory).
--- End diff --

Done, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-10-07 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r18564689
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -313,15 +313,84 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Download a file requested by the executor. Supports fetching the file 
in a variety of ways,
+   * Download a file to target directory. Supports fetching the file in a 
variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the 
URL parameter.
*
+   * If `useCache` is true, first attempts to fetch the file to a local 
cache that's shared 
+   * across executors running the same application. `useCache` is used 
mainly for 
+   * the the executors, not in local mode.
+   *
* Throws SparkException if the target file already exists and has 
different contents than
* the requested file.
*/
-  def fetchFile(url: String, targetDir: File, conf: SparkConf, 
securityMgr: SecurityManager,
-hadoopConf: Configuration) {
-val filename = url.split(/).last
+  def fetchFile(
+  url: String,
+  targetDir: File,
+  conf: SparkConf,
+  securityMgr: SecurityManager,
+  hadoopConf: Configuration,
+  timestamp: Long,
+  useCache: Boolean) {
+val fileName = url.split(/).last
+val targetFile = new File(targetDir, fileName)
+if (useCache) {
+  val cachedFileName = s${url.hashCode}${timestamp}_cache
+  val lockFileName = s${url.hashCode}${timestamp}_lock
+  val localDir = new File(getLocalDir(conf))
+  val lockFile = new File(localDir, lockFileName)
+  val raf = new RandomAccessFile(lockFile, rw)
+  // Only one executor entry.
+  // The FileLock is only used to control synchronization for 
executors download file,
+  // it's always safe regardless of lock type(mandatory or advisory).
+  val lock = raf.getChannel().lock()
+  val cachedFile = new File(localDir, cachedFileName)
+  try {
+if (!cachedFile.exists()) {
+  doFetchFile(url, localDir, cachedFileName, conf, securityMgr, 
hadoopConf)
+}
+  } finally {
+lock.release()
+  }
+  if (targetFile.exists  !Files.equal(cachedFile, targetFile)) {
+if (conf.getBoolean(spark.files.overwrite, false)) {
+  targetFile.delete()
+  logInfo((File %s exists and does not match contents of %s,  +
+replacing it with %s).format(targetFile, url, url))
--- End diff --

Done, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-18 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-56126098
  
@andrewor14 any more comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-17 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-55852899
  
@andrewor14 @JoshRosen 
I am not sure if the test failure is related to the patch. Can you have a 
look at the failure? Or just retest it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-17 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-55856239
  
@JoshRosen thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-16 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r17643781
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -313,15 +313,83 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Download a file requested by the executor . Supports fetching the 
file in a variety of ways,
+   * including HTTP, HDFS and files on a standard filesystem, based on the 
URL parameter.
+   *
+   * If `useCache` is true, first attempts to fetch the file from a local 
cache that's shared across
+   * executors running the same application.
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-16 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r17643825
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -322,14 +322,14 @@ private[spark] class Executor(
   // Fetch missing dependencies
   for ((name, timestamp) - newFiles if currentFiles.getOrElse(name, 
-1L)  timestamp) {
 logInfo(Fetching  + name +  with timestamp  + timestamp)
-Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, 
env.securityManager,
-  hadoopConf)
+Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+  env.securityManager, hadoopConf, timestamp, true)
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-15 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r17578863
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -313,15 +313,84 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Download a file requested by the executor . Supports fetching the 
file in a variety of ways,
+   * including HTTP, HDFS and files on a standard filesystem, based on the 
URL parameter.
+   *
+   * If `useCache` is true, first attempts to fetch the file from a local 
cache that's shared across
+   * executors running the same application.
+   *
+   * Throws SparkException if the target file already exists and has 
different contents than
+   * the requested file.
+   */
+  def fetchFile(
+  url: String,
+  targetDir: File,
+  conf: SparkConf,
+  securityMgr: SecurityManager,
+  hadoopConf: Configuration,
+  timestamp: Long,
+  useCache: Boolean) {
+val fileName = url.split(/).last
+val targetFile = new File(targetDir, fileName)
+if (useCache) {
+  val cachedFileName = s${url.hashCode}${timestamp}_cache
+  val lockFileName = s${url.hashCode}${timestamp}_lock
+  val localDir = new File(getLocalDir(conf))
+  val lockFile = new File(localDir, lockFileName)
+  val raf = new RandomAccessFile(lockFile, rw)
+  // Only one executor entry.
+  // The FileLock is only used to control synchronization for 
executors download file,
+  // it's always safe regardless of lock type(mandatory or advisory).
+  val lock = raf.getChannel().lock()
+  val cachedFile = new File(localDir, cachedFileName)
+  try {
+if (!cachedFile.exists()) {
+  doFetchFile(url, localDir, cachedFileName, conf, securityMgr, 
hadoopConf)
+}
+  } finally {
+lock.release()
+  }
+  if (targetFile.exists  !Files.equal(cachedFile, targetFile)) {
--- End diff --

This PR try to avoid network blocking (multiple executors download file by 
network to local storage only once). So each executor still fetch files to 
their work directory, but only from local storage ,not from remote node by 
network.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-14 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r17524360
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -313,15 +313,84 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Download a file requested by the executor . Supports fetching the 
file in a variety of ways,
+   * including HTTP, HDFS and files on a standard filesystem, based on the 
URL parameter.
+   *
+   * If `useCache` is true, first attempts to fetch the file from a local 
cache that's shared across
+   * executors running the same application.
+   *
+   * Throws SparkException if the target file already exists and has 
different contents than
+   * the requested file.
+   */
+  def fetchFile(
+  url: String,
+  targetDir: File,
+  conf: SparkConf,
+  securityMgr: SecurityManager,
+  hadoopConf: Configuration,
+  timestamp: Long,
+  useCache: Boolean) {
+val fileName = url.split(/).last
+val targetFile = new File(targetDir, fileName)
+if (useCache) {
+  val cachedFileName = s${url.hashCode}${timestamp}_cache
+  val lockFileName = s${url.hashCode}${timestamp}_lock
+  val localDir = new File(getLocalDir(conf))
+  val lockFile = new File(localDir, lockFileName)
+  val raf = new RandomAccessFile(lockFile, rw)
+  // Only one executor entry.
+  // The FileLock is only used to control synchronization for 
executors download file,
+  // it's always safe regardless of lock type(mandatory or advisory).
+  val lock = raf.getChannel().lock()
+  val cachedFile = new File(localDir, cachedFileName)
+  try {
+if (!cachedFile.exists()) {
+  doFetchFile(url, localDir, cachedFileName, conf, securityMgr, 
hadoopConf)
+}
+  } finally {
+lock.release()
+  }
+  if (targetFile.exists  !Files.equal(cachedFile, targetFile)) {
--- End diff --

These codes just follow 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L354
I think we need fetch the file firstly then compare cachedFile's constant 
and targetFile's constant.(code!Files.equal(cachedFile, targetFile)/code) 
because we can't sure that the file's constant is not changed while 
codetimestamp/code is changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-14 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r17524413
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -805,11 +805,12 @@ class SparkContext(config: SparkConf) extends Logging 
{
   case local   = file: + uri.getPath
   case _ = path
 }
-addedFiles(key) = System.currentTimeMillis
+val timestamp = System.currentTimeMillis
+addedFiles(key) = timestamp
 
 // Fetch the file locally in case a job is executed using 
DAGScheduler.runLocally().
 Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, 
env.securityManager,
-  hadoopConfiguration)
+  hadoopConfiguration, timestamp, useCache = false)
--- End diff --

It's for code DAGScheduler.runLocally()/code. I think it needn't use 
cache because it only run in driver's jvm container once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-14 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r17524670
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -313,15 +313,84 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Download a file requested by the executor . Supports fetching the 
file in a variety of ways,
+   * including HTTP, HDFS and files on a standard filesystem, based on the 
URL parameter.
+   *
+   * If `useCache` is true, first attempts to fetch the file from a local 
cache that's shared across
+   * executors running the same application.
+   *
+   * Throws SparkException if the target file already exists and has 
different contents than
+   * the requested file.
+   */
+  def fetchFile(
+  url: String,
+  targetDir: File,
+  conf: SparkConf,
+  securityMgr: SecurityManager,
+  hadoopConf: Configuration,
+  timestamp: Long,
+  useCache: Boolean) {
+val fileName = url.split(/).last
+val targetFile = new File(targetDir, fileName)
+if (useCache) {
+  val cachedFileName = s${url.hashCode}${timestamp}_cache
+  val lockFileName = s${url.hashCode}${timestamp}_lock
+  val localDir = new File(getLocalDir(conf))
+  val lockFile = new File(localDir, lockFileName)
+  val raf = new RandomAccessFile(lockFile, rw)
+  // Only one executor entry.
+  // The FileLock is only used to control synchronization for 
executors download file,
+  // it's always safe regardless of lock type(mandatory or advisory).
+  val lock = raf.getChannel().lock()
+  val cachedFile = new File(localDir, cachedFileName)
+  try {
+if (!cachedFile.exists()) {
+  doFetchFile(url, localDir, cachedFileName, conf, securityMgr, 
hadoopConf)
+}
+  } finally {
+lock.release()
+  }
+  if (targetFile.exists  !Files.equal(cachedFile, targetFile)) {
+if (conf.getBoolean(spark.files.overwrite, false)) {
+  targetFile.delete()
+  logInfo((File %s exists and does not match contents of %s,  +
+replacing it with %s).format(targetFile, url, url))
+} else {
+  throw new SparkException(
+File  + targetFile +  exists and does not match contents 
of +   + url)
--- End diff --

thanks, done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2872] Fix conflict between code and doc...

2014-09-14 Thread li-zhihui
Github user li-zhihui closed the pull request at:

https://github.com/apache/spark/pull/1684


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-10 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-55084516
  
Thanks @chenghao-intel It seems the sql unit test failure is fixed.

@andrewor14 Can you retest this again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-08 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r17278282
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -313,14 +313,74 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Download a file requested by the executor . Supports fetching the 
file in a variety of ways,
+   * including HTTP, HDFS and files on a standard filesystem, based on the 
URL parameter.
+   *
+   * If `useCache` is true, first attempts to fetch the file from a local 
cache that's shared across
+   * executors running the same application.
+   *
+   * Throws SparkException if the target file already exists and has 
different contents than
+   * the requested file.
+   */
+  def fetchFile(
+  url: String,
+  targetDir: File,
+  conf: SparkConf,
+  securityMgr: SecurityManager,
+  hadoopConf: Configuration,
+  timestamp: Long,
+  useCache: Boolean) {
+val fileName = url.split(/).last
+val targetFile = new File(targetDir, fileName)
+if (useCache) {
+  val cachedFileName = url.hashCode + timestamp + _cach
+  val lockFileName = url.hashCode + timestamp + _lock
+  val localDir = new File(getLocalDir(conf))
+  val lockFile = new File(localDir, lockFileName)
+  val raf = new RandomAccessFile(lockFile, rw)
+  // Only one executor entry.
+  // The FileLock is only used to control synchronization for 
executors download file,
+  // it's always safe regardless of lock type(mandatory or advisory).
+  val lock = raf.getChannel().lock()
+  val cachedFile = new File(localDir, cachedFileName)
+  try {
+if (!cachedFile.exists()) {
+  doFetchFile(url, localDir, conf, securityMgr, hadoopConf)
+  Files.move(new File(localDir, fileName), cachedFile)
+}
+  } finally {
+lock.release()
+  }
+  Files.copy(cachedFile, targetFile)
--- End diff --

I think it's OK, but now executor use these files as they are in their work 
directory code.//code. Maybe we can optimize to avoid the copy in next 
patch if we prove this patch work well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-08 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-54914557
  
@andrewor14 
In yarn mode, these cache files will be clean up automatically, and in 
standalone mode, it's not handled.

Now in standalone mode, application work directory 
codeSPARK_HOME/work/APPLICATION_ID/code in slave nodes is not clean up too. 
I think if this issue (cleaning up application work directory) was resolved, we 
could use the application work directory as cache directory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-08 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r17278861
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -313,14 +313,74 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Download a file requested by the executor . Supports fetching the 
file in a variety of ways,
+   * including HTTP, HDFS and files on a standard filesystem, based on the 
URL parameter.
+   *
+   * If `useCache` is true, first attempts to fetch the file from a local 
cache that's shared across
+   * executors running the same application.
+   *
+   * Throws SparkException if the target file already exists and has 
different contents than
+   * the requested file.
+   */
+  def fetchFile(
+  url: String,
+  targetDir: File,
+  conf: SparkConf,
+  securityMgr: SecurityManager,
+  hadoopConf: Configuration,
+  timestamp: Long,
+  useCache: Boolean) {
+val fileName = url.split(/).last
+val targetFile = new File(targetDir, fileName)
+if (useCache) {
+  val cachedFileName = url.hashCode + timestamp + _cach
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-05 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-54600822
  
@JoshRosen @andrewor14
I use codeurl.hashCode + timestamp/code as codecachedFileName/code, 
I believe it is impossible that existing codeurl.hashCode/code collision 
and codetimestamp/code collision at same time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-04 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r17097313
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -317,13 +317,58 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Copy cached file to targetDir, if not exists, download it from url 
firstly.
+   * If useCache == false, download file to targetDir directly.
--- End diff --

I want to keep the util method flexible primarily, because we make the old 
codefetchFile/code private. But I am not sure if the tag is necessary too.
@JoshRosen How do you think about it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-09-03 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-54408600
  
@JoshRosen @andrewor14 

I test the patch in yarn mode, and the codelocalDir/code is a 
per-application temporary directory in this mode. Now I know it is a problem in 
standalone(and mesos) mode.
codetargetDir/code(from codeSparkFiles.getRootDirectory/code) is 
per-executor temporary directory(in my case ,it is 
code/home/frank/hdfs/yarn/nm-local-dir/usercache/frank/appcache/application_1409795343243_0002/container_1409795343243_0002_01_000126/.//code),
 I think we can use codetargetDir + ..//code as per-application directory 
to save the cache file.

BTW: The codetimestamp/code  follow this code's logic: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L323
 Although I don't understand why the timestamp could be changed in an 
application's life time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-08-26 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-53519868
  
@JoshRosen do you have time to review it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2014-08-25 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1462#issuecomment-53229624
  
Rollback old commits, add a new commit base on latest code.

@pwendell @tgravescs @kayousterhout @tnachen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-08-20 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-52873144
  
@JoshRosen any more comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2872] Fix conflict between code and doc...

2014-08-20 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1684#issuecomment-52873228
  
@tgravescs 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2872] Fix conflict between code and doc...

2014-08-06 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1684#issuecomment-51420203
  
@tgravescs 
Test failure because  FileServerSuite:Build timed out (after 120 
minutes), but I guess the patch is irrelevant to the issue.

Can you ask jenkins test this again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Fix conflict between code and doc in YarnClien...

2014-08-05 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1684#issuecomment-51282316
  
@tgravescs got it.

But, codeYarnClusterSchedulerBackend/code use the same strategy (code 
and comment).

https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala#L41
I think we should fix it to env variable overriding config too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Fix conflict between code and doc in YarnClien...

2014-08-05 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1684#issuecomment-51283355
  
@tgravescs I rollback previous commit and add a new commit just update 
comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2872] Fix conflict between code and doc...

2014-08-05 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1684#issuecomment-51285402
  
@tgravescs done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-08-04 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-51051752
  
@JoshRosen added comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-08-04 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-51147815
  
Thaks @JoshRosen sorry I missed the important operation (and I missed 
codeFileUtil.chmod(targetFile.getAbsolutePath, a+x)/code too).

I add a new commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-08-03 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15738382
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -40,6 +41,10 @@ private[spark] class YarnClusterSchedulerBackend(
 }
 // System property can override environment variable.
 numExecutors = sc.getConf.getInt(spark.executor.instances, 
numExecutors)
-totalExpectedExecutors.set(numExecutors)
+totalExpectedExecutors = numExecutors
--- End diff --

@kayousterhout done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-08-03 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15738384
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
--- End diff --

@kayousterhout done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-08-03 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1525#issuecomment-51011136
  
As @pwendell says, the configuration is disable in standalone mode. And in 
the worst situation, it sleep 
codespark.scheduler.maxRegisteredResourcesWaitingTime/code (just as 
currently recommendation). So, my opinion is to keep the configuration in all 
deploy mode(yarn, standalone, mesos).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-08-03 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1616#issuecomment-51011405
  
@JoshRosen more comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2014-08-03 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1462#issuecomment-51019363
  
@pwendell removing support for this in standalone mode is just keeping 
totalExpectedExecutors zero.

https://github.com/li-zhihui/spark/commit/fa5af15d982e86c880302e8b9ef38645944be13f

I think it just make user use spark more easily. (And sometimes user isn't 
aware of the problem unless we show them by docs or conf). Anyway, I think you 
are the authority on how to make the tradeoff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-08-02 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1525#issuecomment-50962993
  
Maybe we should think the feature in standalone mode and mesos mode 
together. 
Is it necessary in mesos mode? https://github.com/apache/spark/pull/1462 
@tnachen


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Fix conflict between code and doc in YarnClien...

2014-07-31 Thread li-zhihui
GitHub user li-zhihui opened a pull request:

https://github.com/apache/spark/pull/1684

Fix conflict between code and doc in YarnClientSchedulerBackend.scala

Doc say: system properties override environment variables.

https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala#L71

But code is conflict with it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/li-zhihui/spark fixaddarg

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1684.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1684


commit f1be1f01bc86abb3b86461b49842f704e262d279
Author: Li Zhihui zhihui...@intel.com
Date:   2014-07-31T06:27:30Z

Fix conflict between code and doc in YarnClientSchedulerBackend.scala




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix conflict between code and doc in YarnClien...

2014-07-31 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1684#issuecomment-50837342
  
@tgravescs can you please to have a look at this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-31 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1525#issuecomment-50850584
  
@tgravescs @kayousterhout can you close this PR before code frozen of 1.1 
release? Otherwise, it would result in incompatible configuration property name 
because the PR rename 
codespark.scheduler.maxRegisteredExecutorsWaitingTime/code to 
codespark.scheduler.maxRegisteredResourcesWaitingTime/code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-07-28 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1616#discussion_r15447773
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -317,6 +317,28 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Copy cached file to targetDir, if not exists, download it from url.
+   */
+  def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, 
securityMgr: SecurityManager,
+timestamp: Long) {
+val fileName = url.split(/).last
+val cachedFileName = fileName + timestamp
+val targetFile = new File(targetDir, fileName)
+val lockFileName = fileName + timestamp + _lock
+val localDir = new File(getLocalDir(conf))
+val lockFile = new File(localDir, lockFileName)
+val raf = new RandomAccessFile(lockFile, rw)
+val lock = raf.getChannel().lock() // only one executor entry
+val cachedFile = new File(localDir, cachedFileName)
+if (!cachedFile.exists()) {
+  fetchFile(url, localDir, conf, securityMgr)
+  Files.move(new File(localDir, fileName), cachedFile)
+}
+lock.release()
--- End diff --

thanks @harishreedharan done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2014-07-28 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1462#issuecomment-50423094
  
@tnachen I add a new PR to try to fix the issue, 
https://github.com/apache/spark/pull/1525


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2713] Executors of same application in ...

2014-07-27 Thread li-zhihui
GitHub user li-zhihui opened a pull request:

https://github.com/apache/spark/pull/1616

[SPARK-2713] Executors of same application in same host should only 
download files  jars once

If Spark lunched multiple executors in one host for one application, every 
executor would download it dependent files and jars (if not using local: url) 
independently. It maybe result in huge latency. In my case, it result in 20 
seconds latency to download dependent jars(about 17M) when I lunched 32 
executors in every host(total 4 hosts).

This patch will cache downloaded files and jars for executors to reduce 
network throughput and download latency. I my case, the latency was reduced 
from 20 seconds to less than 1 second.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/li-zhihui/spark cachefiles

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1616


commit 967817e082d72fbd7c7150780ceeba6f69588c34
Author: Li Zhihui zhihui...@intel.com
Date:   2014-07-28T01:41:11Z

Executors of same application in same host should only download files  
jars once

commit 0bb6224ee33ef2639e75fdfc1396f84f1821ec77
Author: Li Zhihui zhihui...@intel.com
Date:   2014-07-28T01:54:20Z

Release lock before copy files




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-23 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1525#issuecomment-49836422
  
I add a new commit, @tgravescs @markhamstra 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-23 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15272948
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -268,14 +264,18 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 }
   }
 
+  def sufficientResourcesRegistered(): Boolean = true
+
   override def isReady(): Boolean = {
-if (ready) {
+if (sufficientResourcesRegistered) {
+  logInfo(SchedulerBackend is ready for scheduling beginning +
+, total expected resources:  + totalExpectedResources.get() +
+, minRegisteredResourcesRatio:  + minRegisteredRatio)
   return true
 }
 if ((System.currentTimeMillis() - createTime) = 
maxRegisteredWaitingTime) {
-  ready = true
   logInfo(SchedulerBackend is ready for scheduling beginning after 
waiting  +
-maxRegisteredExecutorsWaitingTime:  + maxRegisteredWaitingTime)
+maxRegisteredResourcesWaitingTime(ms):  + 
maxRegisteredWaitingTime)
--- End diff --

@markhamstra thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-23 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15325405
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 ---
@@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
   var shutdownCallback : (SparkDeploySchedulerBackend) = Unit = _
 
   val maxCores = conf.getOption(spark.cores.max).map(_.toInt)
+  totalExpectedResources.getAndSet(maxCores.getOrElse(0))
--- End diff --

oops, thanks @kayousterhout 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-23 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1525#issuecomment-49957064
  
@kayousterhout I think use totalExpectedCores and totalExpectedExecutors 
replace totalExpectedResources is a good idea, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-23 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1525#issuecomment-49960668
  
@tgravescs @kayousterhout I add a new commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...

2014-07-22 Thread li-zhihui
GitHub user li-zhihui opened a pull request:

https://github.com/apache/spark/pull/1525

Fix race condition at SchedulerBackend.isReady in standalone mode

In SPARK-1946(PR #900), configuration 
codespark.scheduler.minRegisteredExecutorsRatio/code was introduced. 
However, in standalone mode, there is a race condition where isReady() can 
return true because totalExpectedExecutors has not been correctly set.

Because expected executors is uncertain in standalone mode, the PR try to 
use CPU cores(code--total-executor-cores/code) as expected resources to 
judge whether SchedulerBackend is ready.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/li-zhihui/spark fixre4s

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1525.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1525


commit 8b54316c77d086ea3454419ebba92003707bbd76
Author: li-zhihui zhihui...@intel.com
Date:   2014-07-22T08:15:40Z

Fix race condition at SchedulerBackend.isReady in standalone mode




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...

2014-07-22 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1525#issuecomment-49714878
  
@kayousterhout @tgravescs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...

2014-07-22 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15268735
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
+  var totalExpectedResources = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected 
executors) 
+  // Submit tasks only after (registered resources / total expected 
resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = 
conf.getDouble(spark.scheduler.minRegisteredExecutorsRatio, 0)
+  var minRegisteredRatio = 
conf.getDouble(spark.scheduler.minRegisteredResourcesRatio, 0)
   if (minRegisteredRatio  1) minRegisteredRatio = 1
-  // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after 
the time(milliseconds).
+  // Whatever minRegisteredRatio is arrived, submit tasks after the 
time(milliseconds).
--- End diff --

Thanks @markhamstra , but I think the code means that submit tasks time if 
minRegisteredRatio is not reached.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...

2014-07-22 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15268755
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 ---
@@ -108,4 +108,8 @@ private[spark] class SparkDeploySchedulerBackend(
 logInfo(Executor %s removed: %s.format(fullId, message))
 removeExecutor(fullId.split(/)(1), reason.toString)
   }
+
+  override def checkRegisteredResources(): Boolean = {
--- End diff --

good, thanks @markhamstra 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...

2014-07-22 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15268932
  
--- Diff: docs/configuration.md ---
@@ -707,21 +707,22 @@ Apart from these, the following properties are also 
available, and may be useful
   /td
 /tr
 /tr
-  tdcodespark.scheduler.minRegisteredExecutorsRatio/code/td
+  tdcodespark.scheduler.minRegisteredResourcesRatio/code/td
   td0/td
   td
-The minimum ratio of registered executors (registered executors / 
total expected executors)
+The minimum ratio of registered resources (registered resources / 
total expected resources)
+(resources are executors in yarn mode, CPU cores in standalone and 
mesos mode)
--- End diff --

Thanks @tgravescs 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-22 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15269102
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
+  var totalExpectedResources = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected 
executors) 
+  // Submit tasks only after (registered resources / total expected 
resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = 
conf.getDouble(spark.scheduler.minRegisteredExecutorsRatio, 0)
+  var minRegisteredRatio = 
conf.getDouble(spark.scheduler.minRegisteredResourcesRatio, 0)
   if (minRegisteredRatio  1) minRegisteredRatio = 1
-  // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after 
the time(milliseconds).
+  // Whatever minRegisteredRatio is arrived, submit tasks after the 
time(milliseconds).
--- End diff --

good, thanks @markhamstra 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2014-07-21 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1462#issuecomment-49690527
  
Sorry @tgravescs @kayousterhout I am not aware of the issue's seriousness 
at that time. thanks @kayousterhout for your coach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2014-07-20 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1462#issuecomment-49572335
  
@tgravescs 

I tested it on a cluster with mesos-0.18.1(fine-grained and 
coarse-grained), it work well.

I think you are right. In fact, user don't have any idea about expected 
executors in mesos mode (and standalone mode), they only expect CPU 
cores(codespark.cores.max/code). So we need check total registered 
executors' cores and codespark.cores.max/code to judge whether 
SchedulerBackend is ready, and modify 
codespark.scheduler.minRegisteredExecutorsRatio/code to 
codespark.scheduler.minRegisteredResourcesRatio/code.
How do you think about it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2555] Support configuration spark.sched...

2014-07-17 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1462#issuecomment-49284642
  
@tgravescs 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-14 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-48869936
  
@tgravescs add a commit according to comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-11 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14813843
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
+  var totalExpectedExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  // Submit tasks only after (registered executors / total expected 
executors) 
+  // is equal to at least this value, that is double between 0 and 1.
+  var minRegisteredRatio = 
conf.getDouble(spark.scheduler.minRegisteredExecutorsRatio, 0)
--- End diff --

@tgravescs Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-11 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-48714143
  
Thanks @tgravescs 
I will file a new jira for handling mesos and follow it after the PR merged.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-29 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47490304
  
@tgravescs @kayousterhout 
It will lead to a logic deadlock in yarn-cluster mode, if waitBackendReady 
is in TaskSchedulerImpl.start.

How about move it (waitBackendReady) to postStartHook() ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-27 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280510
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains(spark.executor.instances)) {
+  numExecutors = sc.getConf.getInt(spark.executor.instances, 2)
--- End diff --

Cool !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-27 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47331430
  
@tgravescs @kayousterhout 
I move waitBackendReady back to submitTasks method, because it 
(waitBackendReady in start method) dose not work on yarn-cluster mode 
(NullPointException because SparkContext initialize timeout) (yarn-client is 
ok).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14231480
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException(Error notifying standalone scheduler's 
driver actor, e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready) {
--- End diff --

Thanks @pwendell @kayousterhout I am more thoughtful about these code's 
performance. ^_^
But we can't simply inline the code because executorActor is a member of 
inner class DriverActor. Although we can get the member by adding some code, I 
don't sure it cost to do.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47204723
  
@tgravescs @kayousterhout 
I add a new commit

* Move waitBackendReady to TaskSchedulerImpl.start
* Code refactor by @kayousterhout 's comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14232018
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+import scala.collection.mutable.ArrayBuffer
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  with Logging {
+
+  private[spark] def addArg(optionName: String, envVar: String, sysProp: 
String,
+  arrayBuf: ArrayBuffer[String]) {
+if (System.getenv(envVar) != null) {
+  arrayBuf += (optionName, System.getenv(envVar))
+} else if (sc.getConf.contains(sysProp)) {
+  arrayBuf += (optionName, sc.getConf.get(sysProp))
+}
+  }
+
+  override def start() {
+super.start()
+val argsArrayBuf = new ArrayBuffer[String]()
+List((--num-executors, SPARK_EXECUTOR_INSTANCES, 
spark.executor.instances),
+  (--num-executors, SPARK_WORKER_INSTANCES, 
spark.worker.instances))
+  .foreach { case (optName, envVar, sysProp) = addArg(optName, 
envVar, sysProp, argsArrayBuf) }
+val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
+totalExecutors.set(args.numExecutors)
--- End diff --

@kayousterhout Done. 
About constants, maybe we can take another PR to manage constants for the 
whole project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14279974
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains(spark.executor.instances)) {
+  numExecutors = sc.getConf.getInt(spark.executor.instances, 2)
--- End diff --

@kayousterhout There is rule: system properties override environment 
variables. To eliminate the if will lead to environment variable override 
system property.

https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala#L62

BTW @tgravescs It seems these codes  against the rule

https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala#L36


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280169
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
--- End diff --

@kayousterhout The constant (default value of numExecutors) is not only 
defined in this class and  ApplicationMasterArguments. It is also defined in 
ClientArguments. 

Even I guess the below default values are from same consideration like 
numExecurots.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232

If we want to make the value to be constant, we should considerate all of 
them.
So maybe we can add an object org.apache.spark.Constants to manange all 
constants.
@tgravescs 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280444
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains(spark.executor.instances)) {
+  numExecutors = sc.getConf.getInt(spark.executor.instances, 2)
--- End diff --

If we set numExecutors based on SPARK_EXECUTORS_INSTANCE firstly, because 
sc.getConf.getInt(spark.executor.instances, 2) will always return a value, it 
will lead to SPARK_EXECUTORS_INSTANCE be override whatever 
spark.executor.instances is configured.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225172
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+import scala.collection.mutable.ArrayBuffer
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  with Logging {
+
+  private[spark] def addArg(optionName: String, envVar: String, sysProp: 
String,
+  arrayBuf: ArrayBuffer[String]) {
+if (System.getenv(envVar) != null) {
+  arrayBuf += (optionName, System.getenv(envVar))
+} else if (sc.getConf.contains(sysProp)) {
+  arrayBuf += (optionName, sc.getConf.get(sysProp))
+}
+  }
+
+  override def start() {
+super.start()
+val argsArrayBuf = new ArrayBuffer[String]()
+List((--num-executors, SPARK_EXECUTOR_INSTANCES, 
spark.executor.instances),
+  (--num-executors, SPARK_WORKER_INSTANCES, 
spark.worker.instances))
+  .foreach { case (optName, envVar, sysProp) = addArg(optName, 
envVar, sysProp, argsArrayBuf) }
+val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
+totalExecutors.set(args.numExecutors)
--- End diff --

@kayousterhout If we removed creating ApplicationMasterArguments, we must 
assign default value (=2) of numExecutors in this class, that will lead to 
duplicate setting. Unless, we extract the value as a static constant. 
BTW, I think constants reference is a little confused in Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225319
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException(Error notifying standalone scheduler's 
driver actor, e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready) {
--- End diff --

@kayousterhout Now, the method is called per submitting tasks, it can 
return quickly by saving the value of  ready.  
If we moved waitBackendReady to BackendScheduler.start, the method will be 
called only once, and we should follow the idea.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225397
  
--- Diff: 
yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
---
@@ -164,6 +164,7 @@ class ApplicationMaster(args: 
ApplicationMasterArguments, conf: Configuration,
 
   private def startUserClass(): Thread = {
 logInfo(Starting the user JAR in a separate Thread)
+System.setProperty(spark.executor.instances, 
args.numExecutors.toString)
--- End diff --

It's for yarn-cluster mode.
In yarn-cluster mode, Driver run in yarn container and lost System config 
which set in client.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225520
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 ---
@@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def executorAdded(fullId: String, workerId: String, hostPort: 
String, cores: Int,
 memory: Int) {
+totalExecutors.addAndGet(1)
--- End diff --

Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47188350
  
@tgravescs @kayousterhout 
How about move waitBackendReady to TaskSchedulerImpl.start. It will be 
called only once at spark initialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-24 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14166951
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -46,9 +46,14 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  var minRegisteredRatio = 
conf.getDouble(spark.scheduler.minRegisteredExecutorsRatio, 0)
+  val maxRegisteredWaitingTime = 
conf.getInt(spark.scheduler.maxRegisteredExecutorsWaitingTime, 3)
--- End diff --

@tgravescs added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-24 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47061333
  
@tgravescs 
I add a new commit that move waitBackendReady from DAGScheduler.submitStage 
to TaskSchedulerImpl.submitTasks, for 2 reasons

* Optimize performance, some works (creating TaskSet, building task 
preferred locality...)  is irrelevant with executor, that can be done before 
waiting backend ready.
* Clear responsibility, it seems waiting backend ready is responsibility of 
TaskScheduler, not DAGScheduler.
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-22 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14060589
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+import scala.collection.mutable.ArrayBuffer
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  with Logging {
+
+  private[spark] def addArg(optionName: String, envVar: String, sysProp: 
String,
+  arrayBuf: ArrayBuffer[String]) {
+if (System.getenv(envVar) != null) {
+  arrayBuf += (optionName, System.getenv(envVar))
+} else if (sc.getConf.contains(sysProp)) {
+  arrayBuf += (optionName, sc.getConf.get(sysProp))
+}
+  }
+
+  override def start() {
+super.start()
+val argsArrayBuf = new ArrayBuffer[String]()
+List((--num-executors, SPARK_EXECUTOR_INSTANCES, 
spark.executor.instances),
+  (--num-executors, SPARK_WORKER_INSTANCES, 
spark.worker.instances))
+  .foreach { case (optName, envVar, sysProp) = addArg(optName, 
envVar, sysProp, argsArrayBuf) }
+val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
+totalExecutors.set(args.numExecutors)
--- End diff --

@kayousterhout Here ApplicationMaterArguments is used to get default value 
of numExecutors (It's 2, now).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-20 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14010613
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -225,6 +232,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException(Error notifying standalone scheduler's 
driver actor, e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready){
+  return true
+}
+if ((System.currentTimeMillis() - createTime) = 
maxRegisteredWaitingTime) {
+  ready = true
+  return true
+}
+return false
--- End diff --

Thanks @CrazyJvm I made a mistake, the last return is not necessary.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-20 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-46655979
  
Thanks @tgravescs 
I add a new commit.
* code style
* default minRegisteredRatio = 0 in yarn mode
* driver get --num-executors in yarn/alpha



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-20 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-46657630
  
@tgravescs @mridulm 
In my test case(4 nodes, 128 executors), it need 25 seconds to register all 
executors.
Now maxRegisteredWaitingTime = 10 senconds, I think it's not enough.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2193] Improve tasks preferrd locality b...

2014-06-19 Thread li-zhihui
GitHub user li-zhihui opened a pull request:

https://github.com/apache/spark/pull/1131

[SPARK-2193] Improve tasks preferrd locality by sorting tasks partial or...

Now, the last executor(s) maybe not get it’s preferred task(s), although 
these tasks have build in pendingTasksForHosts map. Because executers pick up 
tasks sequential, their preferred task(s) maybe picked up by other executors.
This appearance can be eliminated by sorting tasks partial ordering. 
Executor pick up task by host’s order of task’s preferredLocation, that 
mean, executor firstly pick up all tasks which task.preferredLocations.1 = 
executor.hostName, then secondly…

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/li-zhihui/spark sorttasks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1131


commit a38d5f5baaae54d6e6d9a9e463c265ef4e5fc968
Author: li-zhihui zhihui...@intel.com
Date:   2014-06-19T08:09:09Z

[SPARK-2193] Improve tasks preferrd locality by sorting tasks partial 
ordering.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2193] Improve tasks preferrd locality b...

2014-06-19 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/1131#issuecomment-46540777
  
@mridulm 
for example:
2 tasks(task_x, task_y), 2 executors(host1, host2)
task_x.preferredLocations = [host2, host3, host1]
task_y.preferredLocations = [host1, host3, host4]
the task_x exist in penddingTasks array of host1, host2, host3
the task_y exist in pendinngTasks arrya of host1, host3, host4
if host1 pick up  task_x  task_x is the last task of host2
then host2 can't get any host_preferred_task(and it must wait 3 seconds to 
get task_y)
if host1 pick up task_y and leave task_x to host2
then both of host1 and host2 can get preferred task


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-17 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r13851868
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 ---
@@ -77,6 +77,12 @@ private[spark] class YarnClientSchedulerBackend(
 
 logDebug(ClientArguments called with:  + argsArrayBuf)
 val args = new ClientArguments(argsArrayBuf.toArray, conf)
+totalExecutors.set(args.numExecutors)
--- End diff --

Thanks @tgravescs 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-17 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-46291702
  
@tgravescs I add a commit support yarn-cluster.

A little issue, the YarnClusterSchedulerBackend can't get --num-executors 
as totalExecutors currently(spark-default.xml is ok).
I will follow it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-17 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r13895415
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -225,6 +232,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException(Error notifying standalone scheduler's 
driver actor, e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready){
+  return true
+}
+if ((System.currentTimeMillis() - createTime) = 
maxRegisteredWaitingTime) {
+  ready = true
+  return true
+}
+return false
--- End diff --

thanks @adrian-wang , but I think it's necessary to return true quickly, 
because ready is true most time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

2014-06-16 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r13793268
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -48,6 +48,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
   var totalCoreCount = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
+  val minRegisteredNum = conf.getDouble(spark.executor.minRegisteredNum, 
0)
--- End diff --

discard the commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured nu...

2014-06-13 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r13740596
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -48,6 +48,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
   var totalCoreCount = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
+  val minRegisteredNum = conf.getDouble(spark.executor.minRegisteredNum, 
0)
--- End diff --

@tgravescs 
I think default value the config property should be 0 for keeping 
consistent with previous version, whatever we use number or percentage. 
In my opinion, using executor number as config property should keep 
consistent with the args(and conf, env) --num-executors, user  can accept the 
conf property more easily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit stage after (configured ra...

2014-06-12 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r13688559
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -431,6 +431,16 @@ private[spark] class TaskSchedulerImpl(
 
   // By default, rack is unknown
   def getRackForHost(value: String): Option[String] = None
+  override def waitBackendReady():Unit={
+if(backend.isReady){
+  return
+}
+while(!backend.isReady){
+  synchronized{
+this.wait(100)
--- End diff --

Just for programming simply. :)
If someone would like to implement more backend implementations or change 
backend.isReady somewhere, they needn't to call NOTIFY().

But, waiting 100 milliseconds maybe too long, is 10 milliseconds OK?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >