[ 
https://issues.apache.org/jira/browse/SPARK-36534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jahar updated SPARK-36534:
--------------------------
    Description: 
I am running Spark on Kubernetes in Client mode. Spark driver is spawned 
programmatically (No Spark-Submit). Below is the dummy code to set SparkSession 
with KubeApiServer as Master.

 
{code:java}
// code placeholder
private static SparkSession getSparkSession()
{
mySparkSessionBuilder = SparkSession.builder()
              .master("k8s://http://<some_IP>:6443")
              .appName("spark-K8sDemo")
              .config("spark.kubernetes.container.image","spark:3.0")
              .appName("spark-K8sDemo")
              .config("spark.jars", 
"/tmp/jt/database-0.0.1-SNAPSHOT-jar-with-dependencies.jar")
              
.config("spark.kubernetes.executor.podTemplateFile","/tmp/jt/sparkExecutorPodTemplate.yaml")
              .config("spark.kubernetes.container.image.pullPolicy","Always")
              .config("spark.kubernetes.namespace","my_namespace")
              .config("spark.driver.host", "spark-driver-example")
              .config("spark.driver.port", "29413")
              
.config("spark.kubernetes.authenticate.driver.serviceAccountName","spark")
              .config("spark.extraListeners","K8sPoc.MyHealthCheckListener");
setAditionalConfig();
mySession= mySparkSessionBuilder.getOrCreate();
return mySession;
}
{code}
 

Now the  problem is that, in certain scenarios like if K8s master is not 
reachable or master URL is incorrect or spark.kubernetes.container.image config 
is missing then it throws below exceptions (*Exception 1* and *Exception 2* 
given below).

These exceptions are never propagated to Spark Driver program which in turn 
makes Spark Application in stuck state forever.

There should be a way to know via SparkSession or SparkContext object if 
Session was created successful without any such exceptions and can run 
SparkTasks??

I have looked at SparkSession, SparkContext API documentation and 
SparkListeners but didn't find any such way to check if SparkSession is ready 
to run the Tasks or if not then dont keep the Spark Application in hanging 
state rather return a proper error/warn message to calling API.

 

*Exception 1: (If _spark.kubernetes.container.image_ config is missing:*

 
{code:java}
21/08/16 16:27:07 WARN TaskSchedulerImpl: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient resources 21/08/16 16:27:07 INFO ExecutorPodsAllocator: Going to 
request 2 executors from Kubernetes. 21/08/16 16:27:07 ERROR Utils: Uncaught 
exception in thread kubernetes-executor-snapshots-subscribers-1 
org.apache.spark.SparkException: Must specify the executor container image at 
org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.$anonfun$executorContainerImage$1(BasicExecutorFeatureStep.scala:41)
 at scala.Option.getOrElse(Option.scala:189) at 
org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.(BasicExecutorFeatureStep.scala:41)
 at 
org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$16(ExecutorPodsAllocator.scala:216)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:208)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:82)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1$adapted(ExecutorPodsAllocator.scala:82)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$callSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:110)
 at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357) at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:107)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:71)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) {code}
{noformat}

{noformat}

*Exception 2: (If _K8s master_ is not reachable or wrong URL:*
{code:java}
 21/08/16 16:45:07 ERROR Utils: Uncaught exception in thread 
kubernetes-executor-snapshots-subscribers-1 
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [create] for 
kind: [Pod] with name: [null] in namespace: [default] failed. at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
 at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:338)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$16(ExecutorPodsAllocator.scala:222)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:208)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:82)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1$adapted(ExecutorPodsAllocator.scala:82)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$callSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:110)
 at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357) at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:107)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:71)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
java.net.SocketTimeoutException: connect timed out at 
java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at 
java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)
 at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476) at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200) 
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162) at 
java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394) at 
java.net.Socket.connect(Socket.java:606) at 
okhttp3.internal.platform.Platform.connectSocket(Platform.java:129) at 
okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:247)
 at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:167) 
at 
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:258)
 at 
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
 at 
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
 at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) 
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at 
okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:127)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at 
io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:134)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at 
io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at 
io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createHttpClient$3(HttpClientUtils.java:111)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:257) at 
okhttp3.RealCall.execute(RealCall.java:93) at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:469)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:430)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:251)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:815)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:333)
 ... 16 more 21/08/16 16:45:08 INFO ExecutorPodsAllocator: Going to request 2 
executors from Kubernetes. 21/08/16 16:45:11 WARN TaskSchedulerImpl: Initial 
job has not accepted any resources; check your cluster UI to ensure that 
workers are registered and have sufficient resources 21/08/16 16:45:11 WARN 
WatchConnectionManager: Exec Failure{code}

  was:
I am running Spark on Kubernetes in Client mode. Spark driver is spawned 
programmatically (No Spark-Submit). Below is the dummy code to set SparkSession 
with KubeApiServer as Master.

 
{code:java}
// code placeholder
private static SparkSession getSparkSession()
{
mySparkSessionBuilder = SparkSession.builder()
              .master("k8s://http://<some_IP>:6443")
              .appName("spark-K8sDemo")
              .config("spark.kubernetes.container.image","spark:3.0")
              .appName("spark-K8sDemo")
              .config("spark.jars", 
"/tmp/jt/database-0.0.1-SNAPSHOT-jar-with-dependencies.jar")
              
.config("spark.kubernetes.executor.podTemplateFile","/tmp/jt/sparkExecutorPodTemplate.yaml")
              .config("spark.kubernetes.container.image.pullPolicy","Always")
              .config("spark.kubernetes.namespace","my_namespace")
              .config("spark.driver.host", "spark-driver-example")
              .config("spark.driver.port", "29413")
              
.config("spark.kubernetes.authenticate.driver.serviceAccountName","spark")
              .config("spark.extraListeners","K8sPoc.MyHealthCheckListener");
setAditionalConfig();
mySession= mySparkSessionBuilder.getOrCreate();
return mySession;
}
{code}
 

Now the  problem is that, in certain scenarios like if K8s master is not 
reachable or master URL is incorrect or spark.kubernetes.container.image config 
is missing then it throws below exceptions (*Exception 1* and *Exception 2* 
given below).

These exceptions are never propagated to Spark Driver program which in turn 
makes Spark Application in stuck state forever.

There should be a way to know via SparkSession or SparkContext object if 
Session was created successful without any such exceptions and can run 
SparkTasks??

I have looked at SparkSession, SparkContext API documentation and 
SparkListeners but didn't find any such way to check if SparkSession is ready 
to run the Tasks or if not then dont keep the Spark Application in hanging 
state rather return a proper error/warn message to calling API.

 

*Exception 1: (If _spark.kubernetes.container.image_ config is missing:*

 

 
{noformat}

{noformat}
 
{noformat}
21/08/16 16:27:07 WARN TaskSchedulerImpl: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient resources 21/08/16 16:27:07 INFO ExecutorPodsAllocator: Going to 
request 2 executors from Kubernetes. 21/08/16 16:27:07 ERROR Utils: Uncaught 
exception in thread kubernetes-executor-snapshots-subscribers-1 
org.apache.spark.SparkException: Must specify the executor container image at 
org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.$anonfun$executorContainerImage$1(BasicExecutorFeatureStep.scala:41)
 at scala.Option.getOrElse(Option.scala:189) at 
org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.(BasicExecutorFeatureStep.scala:41)
 at 
org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$16(ExecutorPodsAllocator.scala:216)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:208)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:82)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1$adapted(ExecutorPodsAllocator.scala:82)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$callSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:110)
 at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357) at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:107)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:71)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
{noformat}
 

*Exception 2: (If _K8s master_ is not reachable or wrong URL:*
{noformat}
 {noformat}
{noformat}
21/08/16 16:45:07 ERROR Utils: Uncaught exception in thread 
kubernetes-executor-snapshots-subscribers-1 
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [create] for 
kind: [Pod] with name: [null] in namespace: [default] failed. at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
 at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:338)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$16(ExecutorPodsAllocator.scala:222)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:208)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:82)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1$adapted(ExecutorPodsAllocator.scala:82)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$callSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:110)
 at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357) at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:107)
 at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:71)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
java.net.SocketTimeoutException: connect timed out at 
java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at 
java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)
 at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476) at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200) 
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162) at 
java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394) at 
java.net.Socket.connect(Socket.java:606) at 
okhttp3.internal.platform.Platform.connectSocket(Platform.java:129) at 
okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:247)
 at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:167) 
at 
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:258)
 at 
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
 at 
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
 at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) 
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at 
okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:127)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at 
io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:134)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at 
io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at 
io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createHttpClient$3(HttpClientUtils.java:111)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
 at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
 at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:257) at 
okhttp3.RealCall.execute(RealCall.java:93) at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:469)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:430)
 at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:251)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:815)
 at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:333)
 ... 16 more 21/08/16 16:45:08 INFO ExecutorPodsAllocator: Going to request 2 
executors from Kubernetes. 21/08/16 16:45:11 WARN TaskSchedulerImpl: Initial 
job has not accepted any resources; check your cluster UI to ensure that 
workers are registered and have sufficient resources 21/08/16 16:45:11 WARN 
WatchConnectionManager: Exec Failure{noformat}
{noformat}
 {noformat}
 


>  No way to check If Spark Session is created successfully with No Exceptions 
> and ready to execute Tasks
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-36534
>                 URL: https://issues.apache.org/jira/browse/SPARK-36534
>             Project: Spark
>          Issue Type: Improvement
>          Components: Java API, Kubernetes, Scheduler
>    Affects Versions: 3.0.1
>         Environment: *Spark 3.0.1*
>            Reporter: Jahar
>            Priority: Major
>
> I am running Spark on Kubernetes in Client mode. Spark driver is spawned 
> programmatically (No Spark-Submit). Below is the dummy code to set 
> SparkSession with KubeApiServer as Master.
>  
> {code:java}
> // code placeholder
> private static SparkSession getSparkSession()
> {
> mySparkSessionBuilder = SparkSession.builder()
>               .master("k8s://http://<some_IP>:6443")
>               .appName("spark-K8sDemo")
>               .config("spark.kubernetes.container.image","spark:3.0")
>               .appName("spark-K8sDemo")
>               .config("spark.jars", 
> "/tmp/jt/database-0.0.1-SNAPSHOT-jar-with-dependencies.jar")
>               
> .config("spark.kubernetes.executor.podTemplateFile","/tmp/jt/sparkExecutorPodTemplate.yaml")
>               .config("spark.kubernetes.container.image.pullPolicy","Always")
>               .config("spark.kubernetes.namespace","my_namespace")
>               .config("spark.driver.host", "spark-driver-example")
>               .config("spark.driver.port", "29413")
>               
> .config("spark.kubernetes.authenticate.driver.serviceAccountName","spark")
>               .config("spark.extraListeners","K8sPoc.MyHealthCheckListener");
> setAditionalConfig();
> mySession= mySparkSessionBuilder.getOrCreate();
> return mySession;
> }
> {code}
>  
> Now the  problem is that, in certain scenarios like if K8s master is not 
> reachable or master URL is incorrect or spark.kubernetes.container.image 
> config is missing then it throws below exceptions (*Exception 1* and 
> *Exception 2* given below).
> These exceptions are never propagated to Spark Driver program which in turn 
> makes Spark Application in stuck state forever.
> There should be a way to know via SparkSession or SparkContext object if 
> Session was created successful without any such exceptions and can run 
> SparkTasks??
> I have looked at SparkSession, SparkContext API documentation and 
> SparkListeners but didn't find any such way to check if SparkSession is ready 
> to run the Tasks or if not then dont keep the Spark Application in hanging 
> state rather return a proper error/warn message to calling API.
>  
> *Exception 1: (If _spark.kubernetes.container.image_ config is missing:*
>  
> {code:java}
> 21/08/16 16:27:07 WARN TaskSchedulerImpl: Initial job has not accepted any 
> resources; check your cluster UI to ensure that workers are registered and 
> have sufficient resources 21/08/16 16:27:07 INFO ExecutorPodsAllocator: Going 
> to request 2 executors from Kubernetes. 21/08/16 16:27:07 ERROR Utils: 
> Uncaught exception in thread kubernetes-executor-snapshots-subscribers-1 
> org.apache.spark.SparkException: Must specify the executor container image at 
> org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.$anonfun$executorContainerImage$1(BasicExecutorFeatureStep.scala:41)
>  at scala.Option.getOrElse(Option.scala:189) at 
> org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.(BasicExecutorFeatureStep.scala:41)
>  at 
> org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43)
>  at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$16(ExecutorPodsAllocator.scala:216)
>  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:208)
>  at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:82)
>  at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1$adapted(ExecutorPodsAllocator.scala:82)
>  at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$callSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:110)
>  at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357) at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:107)
>  at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:71)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) {code}
> {noformat}
> {noformat}
> *Exception 2: (If _K8s master_ is not reachable or wrong URL:*
> {code:java}
>  21/08/16 16:45:07 ERROR Utils: Uncaught exception in thread 
> kubernetes-executor-snapshots-subscribers-1 
> io.fabric8.kubernetes.client.KubernetesClientException: Operation: [create] 
> for kind: [Pod] with name: [null] in namespace: [default] failed. at 
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
>  at 
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
>  at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:338)
>  at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$16(ExecutorPodsAllocator.scala:222)
>  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:208)
>  at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:82)
>  at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1$adapted(ExecutorPodsAllocator.scala:82)
>  at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$callSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:110)
>  at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357) at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:107)
>  at 
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:71)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Caused by: 
> java.net.SocketTimeoutException: connect timed out at 
> java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at 
> java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)
>  at 
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476) 
> at 
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
>  at 
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200) at 
> java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162) at 
> java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394) at 
> java.net.Socket.connect(Socket.java:606) at 
> okhttp3.internal.platform.Platform.connectSocket(Platform.java:129) at 
> okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:247)
>  at 
> okhttp3.internal.connection.RealConnection.connect(RealConnection.java:167) 
> at 
> okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:258)
>  at 
> okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
>  at 
> okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
>  at 
> okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at 
> okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) 
> at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at 
> okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) 
> at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at 
> okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:127)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at 
> io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:134)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at 
> io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at 
> io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createHttpClient$3(HttpClientUtils.java:111)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
>  at 
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
>  at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:257) at 
> okhttp3.RealCall.execute(RealCall.java:93) at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:469)
>  at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:430)
>  at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:251)
>  at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:815)
>  at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:333)
>  ... 16 more 21/08/16 16:45:08 INFO ExecutorPodsAllocator: Going to request 2 
> executors from Kubernetes. 21/08/16 16:45:11 WARN TaskSchedulerImpl: Initial 
> job has not accepted any resources; check your cluster UI to ensure that 
> workers are registered and have sufficient resources 21/08/16 16:45:11 WARN 
> WatchConnectionManager: Exec Failure{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to