Address comments to fix code formats
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4c22c55a Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4c22c55a Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4c22c55a Branch: refs/heads/master Commit: 4c22c55ad6900433014c36f8c025645c3e261c43 Parents: 161ab93 Author: Raymond Liu <raymond....@intel.com> Authored: Fri Jan 10 09:44:44 2014 +0800 Committer: Raymond Liu <raymond....@intel.com> Committed: Tue Jan 14 10:41:42 2014 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/deploy/yarn/Client.scala | 9 ++++----- .../org/apache/spark/deploy/yarn/ClientBase.scala | 13 ++++++------- .../apache/spark/deploy/yarn/WorkerRunnableUtil.scala | 11 ++++++----- .../scala/org/apache/spark/deploy/yarn/Client.scala | 13 ++++++------- 4 files changed, 22 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4c22c55a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e58a926..71a64ec 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -110,15 +110,15 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext } - def calculateAMMemory(newApp: GetNewApplicationResponse) :Int = { - val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory() + def calculateAMMemory(newApp: GetNewApplicationResponse): Int = { + val minResMemory = newApp.getMinimumResourceCapability().getMemory() val amMemory = ((args.amMemory / minResMemory) * minResMemory) + ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - YarnAllocationHandler.MEMORY_OVERHEAD) + YarnAllocationHandler.MEMORY_OVERHEAD) amMemory } - def setupSecurityToken(amContainer :ContainerLaunchContext) = { + def setupSecurityToken(amContainer: ContainerLaunchContext) = { // Setup security tokens. val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) @@ -154,7 +154,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa ) val state = report.getYarnApplicationState() - val dsStatus = report.getFinalApplicationStatus() if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4c22c55a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 96e998a..2db5744 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -54,8 +54,6 @@ trait ClientBase extends Logging { val args: ClientArguments val conf: Configuration val sparkConf: SparkConf - - //var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" @@ -140,9 +138,10 @@ trait ClientBase extends Logging { } //check for ports if (srcUri.getPort() != dstUri.getPort()) { - return false + false + } else { + true } - return true } /** Copy the file into HDFS if needed. */ @@ -169,7 +168,7 @@ trait ClientBase extends Logging { destPath } - def qualifyForLocal(localURI : URI): Path = { + def qualifyForLocal(localURI: URI): Path = { var qualifiedURI = localURI // If not specified assume these are in the local filesystem to keep behavior like Hadoop if (qualifiedURI.getScheme() == null) { @@ -296,9 +295,9 @@ trait ClientBase extends Logging { retval.toString } - def calculateAMMemory(newApp: GetNewApplicationResponse) :Int + def calculateAMMemory(newApp: GetNewApplicationResponse): Int - def setupSecurityToken(amContainer :ContainerLaunchContext) + def setupSecurityToken(amContainer: ContainerLaunchContext) def createContainerLaunchContext( newApp: GetNewApplicationResponse, http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4c22c55a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala index 3c9379d..bfa8f84 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala @@ -45,11 +45,12 @@ trait WorkerRunnableUtil extends Logging { val sparkConf: SparkConf lazy val env = prepareEnvironment - def prepareCommand(masterAddress: String, - slaveId: String, - hostname: String, - workerMemory: Int, - workerCores: Int) = { + def prepareCommand( + masterAddress: String, + slaveId: String, + hostname: String, + workerMemory: Int, + workerCores: Int) = { // Extra options for the JVM var JAVA_OPTS = "" // Set the JVM memory http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4c22c55a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index fef4702..837b7e1 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -108,11 +108,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue) logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s, queueApplicationCount = %s, queueChildQueueCount = %s""".format( - queueInfo.getQueueName, - queueInfo.getCurrentCapacity, - queueInfo.getMaximumCapacity, - queueInfo.getApplications.size, - queueInfo.getChildQueues.size)) + queueInfo.getQueueName, + queueInfo.getCurrentCapacity, + queueInfo.getMaximumCapacity, + queueInfo.getApplications.size, + queueInfo.getChildQueues.size)) } def calculateAMMemory(newApp: GetNewApplicationResponse) :Int = { @@ -124,7 +124,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa args.amMemory } - def setupSecurityToken(amContainer :ContainerLaunchContext) = { + def setupSecurityToken(amContainer: ContainerLaunchContext) = { // Setup security tokens. val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) @@ -160,7 +160,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa ) val state = report.getYarnApplicationState() - val dsStatus = report.getFinalApplicationStatus() if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {