Repository: spark
Updated Branches:
  refs/heads/master 7f16affa2 -> 6a68c5d7b


[SPARK-16757] Set up Spark caller context to HDFS and YARN

## What changes were proposed in this pull request?

1. Pass `jobId` to Task.
2. Invoke Hadoop APIs.
    * A new function `setCallerContext` is added in `Utils`. `setCallerContext` 
function invokes APIs of   `org.apache.hadoop.ipc.CallerContext` to set up 
spark caller contexts, which will be written into `hdfs-audit.log` and Yarn RM 
audit log.
    * For HDFS: Spark sets up its caller context by 
invoking`org.apache.hadoop.ipc.CallerContext` in `Task` and Yarn `Client` and 
`ApplicationMaster`.
    * For Yarn: Spark sets up its caller context by invoking 
`org.apache.hadoop.ipc.CallerContext` in Yarn `Client`.

## How was this patch tested?
Manual Tests against some Spark applications in Yarn client mode and Yarn 
cluster mode. Need to check if spark caller contexts are written into HDFS 
hdfs-audit.log and Yarn RM audit log successfully.

For example, run SparkKmeans in Yarn client mode:
```
./bin/spark-submit --verbose --executor-cores 3 --num-executors 1 --master yarn 
--deploy-mode client --class org.apache.spark.examples.SparkKMeans 
examples/target/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar 
hdfs://localhost:9000/lr_big.txt 2 5
```

**Before**:
There will be no Spark caller context in records of `hdfs-audit.log` and Yarn 
RM audit log.

**After**:
Spark caller contexts will be written in records of `hdfs-audit.log` and Yarn 
RM audit log.

These are records in `hdfs-audit.log`:
```
2016-09-20 11:54:24,116 INFO FSNamesystem.audit: allowed=true   ugi=wyang 
(auth:SIMPLE) ip=/127.0.0.1   cmd=open        src=/lr_big.txt dst=null        
perm=null       proto=rpc       
callerContext=SPARK_CLIENT_AppId_application_1474394339641_0005
2016-09-20 11:54:28,164 INFO FSNamesystem.audit: allowed=true   ugi=wyang 
(auth:SIMPLE) ip=/127.0.0.1   cmd=open        src=/lr_big.txt dst=null        
perm=null       proto=rpc       
callerContext=SPARK_TASK_AppId_application_1474394339641_0005_JobId_0_StageId_0_AttemptId_0_TaskId_2_AttemptNum_0
2016-09-20 11:54:28,164 INFO FSNamesystem.audit: allowed=true   ugi=wyang 
(auth:SIMPLE) ip=/127.0.0.1   cmd=open        src=/lr_big.txt dst=null        
perm=null       proto=rpc       
callerContext=SPARK_TASK_AppId_application_1474394339641_0005_JobId_0_StageId_0_AttemptId_0_TaskId_1_AttemptNum_0
2016-09-20 11:54:28,164 INFO FSNamesystem.audit: allowed=true   ugi=wyang 
(auth:SIMPLE) ip=/127.0.0.1   cmd=open        src=/lr_big.txt dst=null        
perm=null       proto=rpc       
callerContext=SPARK_TASK_AppId_application_1474394339641_0005_JobId_0_StageId_0_AttemptId_0_TaskId_0_AttemptNum_0
```
```
2016-09-20 11:59:33,868 INFO FSNamesystem.audit: allowed=true   ugi=wyang 
(auth:SIMPLE) ip=/127.0.0.1   cmd=mkdirs      
src=/private/tmp/hadoop-wyang/nm-local-dir/usercache/wyang/appcache/application_1474394339641_0006/container_1474394339641_0006_01_000001/spark-warehouse
       dst=null        perm=wyang:supergroup:rwxr-xr-x proto=rpc       
callerContext=SPARK_APPLICATION_MASTER_AppId_application_1474394339641_0006_AttemptId_1
2016-09-20 11:59:37,214 INFO FSNamesystem.audit: allowed=true   ugi=wyang 
(auth:SIMPLE) ip=/127.0.0.1   cmd=open        src=/lr_big.txt dst=null        
perm=null       proto=rpc       
callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_1_AttemptNum_0
2016-09-20 11:59:37,215 INFO FSNamesystem.audit: allowed=true   ugi=wyang 
(auth:SIMPLE) ip=/127.0.0.1   cmd=open        src=/lr_big.txt dst=null        
perm=null       proto=rpc       
callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_2_AttemptNum_0
2016-09-20 11:59:37,215 INFO FSNamesystem.audit: allowed=true   ugi=wyang 
(auth:SIMPLE) ip=/127.0.0.1   cmd=open        src=/lr_big.txt dst=null        
perm=null       proto=rpc       
callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_0_AttemptNum_0
2016-09-20 11:59:42,391 INFO FSNamesystem.audit: allowed=true   ugi=wyang 
(auth:SIMPLE) ip=/127.0.0.1   cmd=open        src=/lr_big.txt dst=null        
perm=null       proto=rpc       
callerContext=SPARK_TASK_AppId_application_1474394339641_0006_AttemptId_1_JobId_0_StageId_0_AttemptId_0_TaskId_3_AttemptNum_0
```
This is a record in Yarn RM log:
```
2016-09-20 11:59:24,050 INFO 
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=wyang    
IP=127.0.0.1    OPERATION=Submit Application Request    TARGET=ClientRMService  
RESULT=SUCCESS  APPID=application_1474394339641_0006    
CALLERCONTEXT=SPARK_CLIENT_AppId_application_1474394339641_0006
```

Author: Weiqing Yang <yangweiqing...@gmail.com>

Closes #14659 from Sherry302/callercontextSubmit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a68c5d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a68c5d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a68c5d7

Branch: refs/heads/master
Commit: 6a68c5d7b4eb07e4ed6b702dd1536cd08d9bba7d
Parents: 7f16aff
Author: Weiqing Yang <yangweiqing...@gmail.com>
Authored: Tue Sep 27 08:10:38 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Tue Sep 27 08:10:38 2016 -0500

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  6 +-
 .../org/apache/spark/scheduler/ResultTask.scala | 15 ++++-
 .../apache/spark/scheduler/ShuffleMapTask.scala | 13 +++-
 .../scala/org/apache/spark/scheduler/Task.scala | 17 +++++-
 .../scala/org/apache/spark/util/Utils.scala     | 62 ++++++++++++++++++++
 .../org/apache/spark/util/UtilsSuite.scala      | 12 ++++
 .../spark/deploy/yarn/ApplicationMaster.scala   |  7 +++
 .../org/apache/spark/deploy/yarn/Client.scala   |  4 +-
 8 files changed, 126 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6a68c5d7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dd47c1d..5ea0b48 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1015,7 +1015,8 @@ class DAGScheduler(
             val locs = taskIdToLocations(id)
             val part = stage.rdd.partitions(id)
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
-              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
+              taskBinary, part, locs, stage.latestInfo.taskMetrics, 
properties, Option(jobId),
+              Option(sc.applicationId), sc.applicationAttemptId)
           }
 
         case stage: ResultStage =>
@@ -1024,7 +1025,8 @@ class DAGScheduler(
             val part = stage.rdd.partitions(p)
             val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptId,
-              taskBinary, part, locs, id, properties, 
stage.latestInfo.taskMetrics)
+              taskBinary, part, locs, id, properties, 
stage.latestInfo.taskMetrics,
+              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
           }
       }
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/6a68c5d7/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 609f10a..1e7c63a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -43,7 +43,12 @@ import org.apache.spark.rdd.RDD
  *                 input RDD's partitions).
  * @param localProperties copy of thread-local properties set by the user on 
the driver side.
  * @param metrics a [[TaskMetrics]] that is created at driver side and sent to 
executor side.
- */
+ *
+ * The parameters below are optional:
+ * @param jobId id of the job this task belongs to
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
+  */
 private[spark] class ResultTask[T, U](
     stageId: Int,
     stageAttemptId: Int,
@@ -52,8 +57,12 @@ private[spark] class ResultTask[T, U](
     locs: Seq[TaskLocation],
     val outputId: Int,
     localProperties: Properties,
-    metrics: TaskMetrics)
-  extends Task[U](stageId, stageAttemptId, partition.index, metrics, 
localProperties)
+    metrics: TaskMetrics,
+    jobId: Option[Int] = None,
+    appId: Option[String] = None,
+    appAttemptId: Option[String] = None)
+  extends Task[U](stageId, stageAttemptId, partition.index, metrics, 
localProperties, jobId,
+    appId, appAttemptId)
   with Serializable {
 
   @transient private[this] val preferredLocs: Seq[TaskLocation] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/6a68c5d7/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 448fe02..66d6790 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -44,6 +44,11 @@ import org.apache.spark.shuffle.ShuffleWriter
  * @param locs preferred task execution locations for locality scheduling
  * @param metrics a [[TaskMetrics]] that is created at driver side and sent to 
executor side.
  * @param localProperties copy of thread-local properties set by the user on 
the driver side.
+ *
+ * The parameters below are optional:
+ * @param jobId id of the job this task belongs to
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
  */
 private[spark] class ShuffleMapTask(
     stageId: Int,
@@ -52,8 +57,12 @@ private[spark] class ShuffleMapTask(
     partition: Partition,
     @transient private var locs: Seq[TaskLocation],
     metrics: TaskMetrics,
-    localProperties: Properties)
-  extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, 
localProperties)
+    localProperties: Properties,
+    jobId: Option[Int] = None,
+    appId: Option[String] = None,
+    appAttemptId: Option[String] = None)
+  extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, 
localProperties, jobId,
+    appId, appAttemptId)
   with Logging {
 
   /** A constructor used only in test suites. This does not require passing in 
an RDD. */

http://git-wip-us.apache.org/repos/asf/spark/blob/6a68c5d7/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 48daa34..9385e3c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -29,7 +29,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, 
ByteBufferOutputStream, Utils}
+import org.apache.spark.util._
 
 /**
  * A unit of execution. We have two kinds of Task's in Spark:
@@ -47,6 +47,11 @@ import org.apache.spark.util.{AccumulatorV2, 
ByteBufferInputStream, ByteBufferOu
  * @param partitionId index of the number in the RDD
  * @param metrics a [[TaskMetrics]] that is created at driver side and sent to 
executor side.
  * @param localProperties copy of thread-local properties set by the user on 
the driver side.
+ *
+ * The parameters below are optional:
+ * @param jobId id of the job this task belongs to
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
  */
 private[spark] abstract class Task[T](
     val stageId: Int,
@@ -54,7 +59,10 @@ private[spark] abstract class Task[T](
     val partitionId: Int,
     // The default value is only used in tests.
     val metrics: TaskMetrics = TaskMetrics.registered,
-    @transient var localProperties: Properties = new Properties) extends 
Serializable {
+    @transient var localProperties: Properties = new Properties,
+    val jobId: Option[Int] = None,
+    val appId: Option[String] = None,
+    val appAttemptId: Option[String] = None) extends Serializable {
 
   /**
    * Called by [[org.apache.spark.executor.Executor]] to run this task.
@@ -79,9 +87,14 @@ private[spark] abstract class Task[T](
       metrics)
     TaskContext.setTaskContext(context)
     taskThread = Thread.currentThread()
+
     if (_killed) {
       kill(interruptThread = false)
     }
+
+    new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), 
Option(stageAttemptId),
+      Option(taskAttemptId), Option(attemptNumber)).setCurrentContext()
+
     try {
       runTask(context)
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/6a68c5d7/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index e09666c..caa768c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2441,6 +2441,68 @@ private[spark] object Utils extends Logging {
 }
 
 /**
+ * An utility class used to set up Spark caller contexts to HDFS and Yarn. The 
`context` will be
+ * constructed by parameters passed in.
+ * When Spark applications run on Yarn and HDFS, its caller contexts will be 
written into Yarn RM
+ * audit log and hdfs-audit.log. That can help users to better diagnose and 
understand how
+ * specific applications impacting parts of the Hadoop system and potential 
problems they may be
+ * creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184, for a given 
HDFS operation, it's
+ * very helpful to track which upper level job issues it.
+ *
+ * @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
+ *
+ * The parameters below are optional:
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
+ * @param jobId id of the job this task belongs to
+ * @param stageId id of the stage this task belongs to
+ * @param stageAttemptId attempt id of the stage this task belongs to
+ * @param taskId task id
+ * @param taskAttemptNumber task attempt id
+ */
+private[spark] class CallerContext(
+   from: String,
+   appId: Option[String] = None,
+   appAttemptId: Option[String] = None,
+   jobId: Option[Int] = None,
+   stageId: Option[Int] = None,
+   stageAttemptId: Option[Int] = None,
+   taskId: Option[Long] = None,
+   taskAttemptNumber: Option[Int] = None) extends Logging {
+
+   val appIdStr = if (appId.isDefined) s"_${appId.get}" else ""
+   val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" 
else ""
+   val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else ""
+   val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else ""
+   val stageAttemptIdStr = if (stageAttemptId.isDefined) 
s"_${stageAttemptId.get}" else ""
+   val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else ""
+   val taskAttemptNumberStr =
+     if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else ""
+
+   val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
+     jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + 
taskAttemptNumberStr
+
+  /**
+   * Set up the caller context [[context]] by invoking Hadoop CallerContext 
API of
+   * [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8.
+   */
+  def setCurrentContext(): Boolean = {
+    var succeed = false
+    try {
+      val callerContext = 
Utils.classForName("org.apache.hadoop.ipc.CallerContext")
+      val Builder = 
Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
+      val builderInst = 
Builder.getConstructor(classOf[String]).newInstance(context)
+      val hdfsContext = Builder.getMethod("build").invoke(builderInst)
+      callerContext.getMethod("setCurrent", callerContext).invoke(null, 
hdfsContext)
+      succeed = true
+    } catch {
+      case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
+    }
+    succeed
+  }
+}
+
+/**
  * A utility class to redirect the child process's stdout or stderr.
  */
 private[spark] class RedirectThread(

http://git-wip-us.apache.org/repos/asf/spark/blob/6a68c5d7/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 4715fd2..bc28b2d 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -788,6 +788,18 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
         .set("spark.executor.instances", "1")) === 3)
   }
 
+  test("Set Spark CallerContext") {
+    val context = "test"
+    try {
+      val callerContext = 
Utils.classForName("org.apache.hadoop.ipc.CallerContext")
+      assert(new CallerContext(context).setCurrentContext())
+      assert(s"SPARK_$context" ===
+        callerContext.getMethod("getCurrent").invoke(null).toString)
+    } catch {
+      case e: ClassNotFoundException =>
+        assert(!new CallerContext(context).setCurrentContext())
+    }
+  }
 
   test("encodeFileNameToURIRawPath") {
     assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")

http://git-wip-us.apache.org/repos/asf/spark/blob/6a68c5d7/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ad50ea7..aabae14 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -184,6 +184,8 @@ private[spark] class ApplicationMaster(
     try {
       val appAttemptId = client.getAttemptId()
 
+      var attemptID: Option[String] = None
+
       if (isClusterMode) {
         // Set the web ui port to be ephemeral for yarn so we don't conflict 
with
         // other spark processes running on the same box
@@ -196,8 +198,13 @@ private[spark] class ApplicationMaster(
         // Set this internal configuration if it is running on cluster mode, 
this
         // configuration will be checked in SparkContext to avoid misuse of 
yarn cluster mode.
         System.setProperty("spark.yarn.app.id", 
appAttemptId.getApplicationId().toString())
+
+        attemptID = Option(appAttemptId.getAttemptId.toString)
       }
 
+      new CallerContext("APPMASTER",
+        Option(appAttemptId.getApplicationId.toString), 
attemptID).setCurrentContext()
+
       logInfo("ApplicationAttemptId: " + appAttemptId)
 
       val fs = FileSystem.get(yarnConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/6a68c5d7/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 2398f0a..ea4e116 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -54,7 +54,7 @@ import 
org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, 
YarnCommandBuilderUtils}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CallerContext, Utils}
 
 private[spark] class Client(
     val args: ClientArguments,
@@ -161,6 +161,8 @@ private[spark] class Client(
       reportLauncherState(SparkAppHandle.State.SUBMITTED)
       launcherBackend.setAppId(appId.toString)
 
+      new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()
+
       // Verify whether the cluster has enough resources for our AM
       verifyClusterResources(newAppResponse)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to