GitHub user chesterxgchen opened a pull request:
https://github.com/apache/spark/pull/6398
Spark Yarn Client API Improvement/Requirements
Spark Yarn Client API Improvement/Requirements
The problem description
The following description of the problem and requests are mainly focused on
Spark Yarn Application, in particular, spark on Yarn-Cluster Mode.
Use Case:
Our application doesn't use spark-submit command line to run spark. We
submit both hadoop and spark job directly from our servlet application (jetty).
We are deploying in Yarn Cluster Mode. We invoke the Spark Client ( in yarn
module) directly. Client can't call System.exists, which will shut down the
jetty JVM.
Our application will submit and stop spark job, monitoring the spark job
progress, get the states from the spark jobs ( for example, bad data counters
), logging and exceptions. So far the communication is one way (direction)
after the job is submitted; we will move to two-ways communication soon. ( for
example, a long running spark context, with different short spark job or
actions for interactive analysis and visualization)
In our use cases, we have the following requirements
Requirements:
1) Get Yarn Container Capacities before submit Yarn Applications.
Before spark 1.3.1, the spark requires user to specify the memory and
number of executors before start the spark job. In the case Yarn application,
the memory is set beyond the yarn memory capacity, the spark job is simply get
killed. So we try to get the yarn container capacity first before start the
spark job. We use the information in two ways:
A) we cap the request memory usage if the request is too large. For
example, if the spark.executor.memory supplied by client is larger than the
Yarn Container max memory, we reset the spark.executor.memory to yarn max
container max memory minus over head and send a message to the Application ( UI
message) tell the user that we reset the memory. Or we could simply throw
exception without submit the job.
users might be use the information about virtual cores to do other
validation.
B) We can dynamically estimate the executor memory based on data size ( if
you have the information from prev processing steps) and max memory available;
rather than directly use the fix memory size and potentially get kill if they
are too large.
This requirement could be eliminated by Spark 1.3.1 Dynamic Resource
Allocation Feature. But we havenât tried this out yet.
2) Add some callback via listener to monitoring Yarn application progress
So far, the Spark Job in Yarn Cluster mode is essentially a batch job. No
progress is reported back to the user.
We like to have some callback listener to provide feedbacks during yarn
application lifecycle (initialization, in progress, complete, killed etc.) ,
even though the information is very limited.
A) We can get tracking URL from the yarn application listener call back.
The URL allows client to go to the Hadoop Cluster Application management page
directly if they need to check the job status
As soon as the Yarn container is created and job is submitted, we have
tracking URL from Yarn ( we need to watch out for invalid URL), at this point
you can put the URL in the UI, even though the Spark job is not started yet.
B) We display the progress bar on the UI with the callback
For example, in CDH5, we only got 0%, 10% and 100% from Yarn, not very
useful, but still some earlier feedback to customer.
C) We get the Yarn Application ID when the spark job is submitted, which
can be used for tracking progress or kill the app.
3) expose Yarn Kill Application API
The Spark Yarn Client has a stop method. But doesnât seem to work. We
need a killApplication() method, which simply kill the Yarn Application with
application Id.
Yes, you can directly invoke from command line with
yarn kill -applicationId appId.
But since we need to call from our application, we need a API to do this.
In our application, if client start the job and then decided to stop it (
running too long, change parameters etc.), we have to use kill API to kill it,
as stop API doesn't stop it.
4) be able to show user spark job progress
When spark job starts to run, the application like to show progress bar or
stage status (similar to Spark UI) in userâs application.
This means we need to re-direct the spark job status (reported by Spark Job
Listener) to the end application.
5) be able to log spark job print/error/exception back to clients for easy
debugging.
When spark job runs, the print statements and logs are captured in Yarn
Container log. But they are not communicated back to the client. Users usually
read the log on the application cluster management page before the job stops.
But once the job stops, the container is destroyed and logs are not longer
available for end-user. Although the log history is still in the hadoop
cluster, but it is hard for end-user to get it out.
This requires the all the print or error logs are re-directed to the
clientâs application log that can be easily read by the end user.
6) Support long input arguments.
The current spark job inputs are in the form of command line arguments. In
our application, we run into the limitation of the command line argument length
limit. When we run our application with large number of column names, we
serialize the input into string and pass it as argument. But the length is too
long for Command line argument.
One solution is simply use HDFS, i.e. pass the argument as HDFS file, and
pass the path in spark conf and load the file into memory as input before spark
starts.
But this doesnât seem to be an best option, as now every spark job needs
to write to HDFS first and then read back in in the cluster, then has to delete
it when job is finished or killed.
We need alternative channel to pass the argument.
7) create a communication channel that allows to run interactive command
Pull Request Design
Part 1 -- it was originally SPARK-3913
The part 1 design is address the Resource Capacity, and Yarn Application
LIstener as well as kill application API.
Since the Spark 1.3.1 Dynamic Resource Allocation address the resource
allocation issue, so we donât need to address #1) Resource Capacity
requirement.
The main changes are the followings:
<code>
case class YarnAppInfo(appId: ApplicationId,
user: String,
queue: String,
name: String,
masterHost: String,
masterRpcPort: Int,
state: String,
diagnostics: String,
trackingUrl: String,
startTime: Long)
sealed trait YarnApplicationEvent
case class YarnApplicationStart(time: Long) extends YarnApplicationEvent
case class YarnApplicationProgress(time: Long,
progress: YarnAppProgress)
extends YarnApplicationEvent
case class YarnApplicationEnd(time: Long) extends YarnApplicationEvent
trait YarnApplicationListener {
def onApplicationInit(time:Long, appId: ApplicationId)
def onApplicationStart(time:Long, info: YarnAppInfo)
def onApplicationProgress(time:Long, progress: YarnAppProgress)
def onApplicationEnd(time:Long, progress: YarnAppProgress)
def onApplicationFailed(time:Long, progress: YarnAppProgress)
def onApplicationKilled(time:Long, progress: YarnAppProgress)
}
case class YarnAppResource(memory: Int, virtualCores: Int)
case class YarnResourceUsage(numUsedContainers : Int,
numReservedContainers : Int,
usedResource : YarnAppResource,
reservedResource : YarnAppResource,
neededResource : YarnAppResource) {
}
case class YarnAppProgress(appId: ApplicationId,
trackingUrl: String,
usage: YarnResourceUsage,
progress: Float = 0)
</code>
In Client.scala
private val listeners = ListBuffer[YarnApplicationListener]()
def killApplication(appId: ApplicationId ) = {
yarnClient.killApplication(appId)
}
def monitorApplication( appId: ApplicationId,
returnOnRunning: Boolean = false,
logApplicationReport: Boolean =
true): (YarnApplicationState, FinalApplicationStatus) = {
val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
val initialReport = getApplicationReport(appId)
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
val report: ApplicationReport = â¦. //code skipped
val state = report.getYarnApplicationState
state match {
case YarnApplicationState.RUNNING =>
notifyAppProgress(report)
case YarnApplicationState.FINISHED =>
notifyAppFinished(report)
case YarnApplicationState.FAILED =>
notifyAppFailed(report)
case YarnApplicationState.KILLED =>
notifyAppKilled(report)
case _ =>
notifyAppProgress(report)
}
//code skipped ...
}
Part 2 -- Create A communication channel to address requirements #4-7
The main idea of the design is composed of followings:
create a communication channel before the spark job started. The client can
pass the connection URL (with host and port). The Spark Job will try to
establish the connection (Ping-Pong) before launch the spark job.
All logging (print, error) statements will be also send back and log on the
application log (outside the cluster)
Application can send different type of messages to the client application
LogMessage -- Info, Error, Warn, Debug messages are redirected to
corresponding application log.
AppMessage -- message will redirect to the application. For example,
UpdateMessage will indicate to the application program to update the
corresponding job state.
VisualMessage -- message will indicate the client application that the
message will used for visual display.
4) All spark job will need to the implements a trait that allow the spark
job to compose pre- and post- spark job actions.
Here is the main code
trait YarnSparkApp {
// all spark jobs should implements SparkMain method
def sparkMain(appCtx: ApplicationContext)
//this is called by Spark Yarn Client.run(conf)
def run(conf: SparkConf) {
var logger = new ChannelMessageLogger("spark app", None)
logTime { //capture the runtime
var failed = false
var appContext: Option[ApplicationContext] = None
try {
// initialize the application context.
appContext = Some(new ApplicationContext(conf))
//update logger update
logger = appContext.get.logger
logger.logInfo(s"starting ${appContext.get.appName}")
sparkMain(appContext.get)
} catch {
case e: Throwable =>
failed = true
val t = wrapThrowable(appContext, e)
printStackTrace(t)
throw t
} finally {
if (!failed) {
logger.sendUpdateMessage(SPARK_APP_COMPLETED, true)
logger.logInfo(s"spark app finished.")
}
waitToFinish(failed)
appContext.foreach(_.stop())
appContext.foreach(_.restoreConsoleOut())
}
â¦.
}
The run() method first initialize the ApplicationContext which will
established the communication channel, prepare the logger as well as get the
extra arguments ( such as wide column arguments if any) and then run
sparkMain(appCtx: ApplicationContext)
once the job failed or completed, the finally clause will perform the
cleanup.
If the job is completed, the UpdateMessage is sent to the application side
to indicate the spark app is completed. otherwise, the application is consider
the job terminated abnormally.
If the job failed or for the purpose of debugging (via configuration), the
finally clause calls
waitToFinish(failed)
so user can check the log before the container is destroyed.
Here is the ApplicationContext
class ApplicationContext(val conf: SparkConf) {
import ApplicationContext._
val appName = conf.get(SPARK_APP_NAME, "Spark Application")
val deployMode = conf.get(SPARK_DEPLOY_MODE)
val sparkCtx: SparkContext = new SparkContext(deployMode, appName, conf)
val messenger = createAppChannelMessenger()
val logger = new ChannelMessageLogger(appName, Some(this))
val stdOut = Console.out
val stdErr = Console.err
//add spark listener
sparkCtx.addSparkListener(new JobProgressRelayListener(this))
Console.setOut(new RedirectPrintStream(logger, stdOut))
Console.setErr(new RedirectPrintStream(logger, stdErr))
//todo: get extra arguments from client, not implemented.
// todo : call method here
//show the spark conf specified
showConf()
def showConf(): Unit = {
if (conf.get(SPARK_SHOW_CONF, "true").toBoolean) {
println(conf.toDebugString)
}
}
def restoreConsoleOut() = {
Console.setOut(stdOut)
Console.setOut(stdErr)
}
def addSparkListener(listener: SparkListener) {
sparkCtx.addSparkListener(listener)
}
def stop() {
println("[ApplicationContext] stopping ")
CommunicationHelper.stopRelayMessenger(Some(sparkCtx), messenger)
sparkCtx.stop()
}
private def createAppChannelMessenger(): ChannelMessenger =
CommunicationHelper.createRelayMessenger(this)
}
The communication channel
In our design, we leverage the Akka for the messaging channel.
<code>
private[server] def createRelayMessenger(appCtx: ApplicationContext):
ChannelMessenger = {
val protocol = appCtx.conf.get(SPARK_APP_CHANNEL_PROTOCOL, AKKA)
protocol match {
case AKKA => AkkaChannelUtils.createRelayMessenger(appCtx)
case NETTY => NettyMessenger(None)
case _ => ActorMessenger(None)
}
}
</code>
case class ChannelMessageLogger(appName: String = "", appCtx:
Option[ApplicationContext]) {
def logInfo(message: String) {
sendMessage(InfoMessage(message, appName))
}
def logError(message: String, e: Throwable) {
sendMessage(ErrorMessage(message, appName, e))
}
def logWarn(message: String) {
sendMessage(WarnMessage(message, appName))
}
def logDebug(message: String) {
sendMessage(DebugMessage(message, appName))
}
def sendVisualMessage(message: VisualMessage) {
sendMessage(message)
}
def sendUpdateMessage(key: String, value: Any) {
sendMessage(UpdateMessage(appName, key, value))
}
private def sendMessage(message: Any) {
if (appCtx.isDefined) {
val sc = appCtx.map(_.sparkCtx)
val messenger = appCtx.map(_.messenger)
if (sc.isDefined) {
messenger.map(m =>m.sendMessage(sc.get, message))
} else
printlnMessage(message)
} else
printlnMessage(message)
}
def printlnMessage(message: Any): Unit = {
message match {
case err @ ErrorMessage(msg, name, cause, time) =>
Console.err.println(err.toString)
cause.printStackTrace(Console.err)
case _ =>
println(message.toString)
}
}
}
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/AlpineNow/spark alpine-pr-master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/6398.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6398
----
commit d8f7f6b587d7eb0133de0649d28c816607761b99
Author: chesterxgchen <[email protected]>
Date: 2015-05-25T03:23:29Z
1) Add killApplication()
2) Add YarnAppProgress, YarnAppResource, YarnAppProgress, YarnAppINfo and
YarnResourceUsage, and YarnApplicationListener classes
3) Add method notifications based on Yarn State, Progress, Killed etc.
4) Add communication channel
5) add ApplicationContext
6) add YarnAppMain
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]