sririshindra commented on code in PR #49814:
URL: https://github.com/apache/spark/pull/49814#discussion_r1946734535
##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -722,6 +722,9 @@ class SparkContext(config: SparkConf) extends Logging {
}
appStatusSource.foreach(_env.metricsSystem.registerSource(_))
_plugins.foreach(_.registerMetrics(applicationId))
+
+ new CallerContext("DRIVER", config.get(APP_CALLER_CONTEXT),
+ Option(applicationId), applicationAttemptId).setCurrentContext()
Review Comment:
Done
##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala:
##########
@@ -171,6 +172,11 @@ private[hive] class HiveClientImpl(
private def newState(): SessionState = {
val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig,
Some(initClassLoader))
val state = new SessionState(hiveConf)
+ // When SessionState is initialized, the caller context is overridden by
hive
+ // so we need to reset it back to the DRIVER
+ new CallerContext("DRIVER",
+ sparkConf.get(APP_CALLER_CONTEXT),
+
Option(sparkConf.getOption("spark.app.id").getOrElse(""))).setCurrentContext()
Review Comment:
Done.
##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala:
##########
@@ -171,6 +172,11 @@ private[hive] class HiveClientImpl(
private def newState(): SessionState = {
val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig,
Some(initClassLoader))
val state = new SessionState(hiveConf)
+ // When SessionState is initialized, the caller context is overridden by
hive
+ // so we need to reset it back to the DRIVER
Review Comment:
I see, I didn't know that iceberg constructs its own hive client instance. I
am not quite sure how it is being propagated. But I know it is being propagated
properly based on testing. I tested this by running both batch and streaming
jobs that do writes and reads from an Iceberg table.
The way I identified the best location to set the caller context is by first
identifying all the places it is being set elsewhere by setting a breakpoint on
the setCurrent method in the CallerContext.java file in hadoop library. I then
let a spark streaming (and also a batch) job on a cluster write/read from an
iceberg table and I recorded the stack traces where setCurrent method is being
called by attaching a remote debugger. I then identified the shared execution
paths in all these stacktraces and I decided that the the best place to reset
the caller context in spark is here in the HiveClientImpl file after the
SessionState is initialized.
Like you said may be iceberg constructs its own hive instance but maybe that
codepath doesn't set the caller context? I have checked the Iceberg codebase
and I have found the location where a hive client is being created, but I am
not sure if that in turn calls
[newClinet](https://github.com/apache/iceberg/blob/afda8be25652d44d9339a79c6797b6bf20c55bd6/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java#L60)
setCurrent method.
Here are the stacktraces where the method setCurrent is being set in my
testing.
```
Breakpoint reached
at
org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148)
at
org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopCallerContext(Hadoop23Shims.java:546)
at
org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopSessionContext(Hadoop23Shims.java:556)
at
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:508)
at
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:475)
at
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:471)
at
org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:195)
at
org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:143)
at
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1)
at
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:316)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:539)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:399)
at
org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:70)
at
org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:69)
at
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:223)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$2058.2002899646.apply$mcZ$sp(Unknown
Source:-1)
at
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
at
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
at
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
at
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
at
org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:180)
at
org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:178)
at
org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:70)
at
org.apache.spark.sql.hive.HiveSessionStateBuilder$$Lambda$1492.867369720.apply(Unknown
Source:-1)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:123)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:123)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isGlobalTempViewDB(SessionCatalog.scala:1001)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getRawLocalOrGlobalTempView(SessionCatalog.scala:716)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isTempView(SessionCatalog.scala:1008)
at
org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:282)
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:834)
Breakpoint reached at
org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148)
Breakpoint reached
at
org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148)
at
org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopCallerContext(Hadoop23Shims.java:546)
at
org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopSessionContext(Hadoop23Shims.java:556)
at
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:510)
at
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:475)
at
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:471)
at
org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:195)
at
org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:143)
at
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1)
at
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:316)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:539)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:399)
at
org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:70)
at
org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:69)
at
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:223)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$2058.2002899646.apply$mcZ$sp(Unknown
Source:-1)
at
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
at
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
at
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
at
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
at
org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:180)
at
org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:178)
at
org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:70)
at
org.apache.spark.sql.hive.HiveSessionStateBuilder$$Lambda$1492.867369720.apply(Unknown
Source:-1)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:123)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:123)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isGlobalTempViewDB(SessionCatalog.scala:1001)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getRawLocalOrGlobalTempView(SessionCatalog.scala:716)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isTempView(SessionCatalog.scala:1008)
at
org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:282)
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:834)
Breakpoint reached at
org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148)
Breakpoint reached
at
org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148)
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.spark.util.CallerContext.setCurrentContext(Utils.scala:3414)
at
org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:202)
at
org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:143)
at
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1)
at
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:316)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:539)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:399)
at
org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:70)
at
org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:69)
at
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:223)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$2058.2002899646.apply$mcZ$sp(Unknown
Source:-1)
at
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
at
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
at
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
at
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
at
org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:180)
at
org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:178)
at
org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:70)
at
org.apache.spark.sql.hive.HiveSessionStateBuilder$$Lambda$1492.867369720.apply(Unknown
Source:-1)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:123)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:123)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isGlobalTempViewDB(SessionCatalog.scala:1001)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getRawLocalOrGlobalTempView(SessionCatalog.scala:716)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isTempView(SessionCatalog.scala:1008)
at
org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:282)
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:834)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]