Re: spark on k8s - can driver and executor have separate checkpoint location?

2020-05-16 Thread Ali Gouta
Hello,

I am wondering if you do so, then all your executor pods should run on the
same kubernetes worker node since you mount a single volume with a
ReadWriteOnce policy. By design this seems not to be good I assume. You may
need to have a kind of ReadWriteMany policy associated to the volume. Then
have pod anti-affinity to make sure they are not running on the same node.
You may achieve this by running an NFS fiesystem and then create a PV/PVC
that mounts to that shared file system. The persistentVolumeClaim defined
in your Yaml should call the PVC you created.

Best regards,
Ali Gouta.

On Sat, May 16, 2020 at 6:06 AM wzhan  wrote:

> Hi guys,
>
> I'm running spark applications on kubernetes. According to spark
> documentation
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
> Spark needs distributed file system to store its checkpoint data so that in
> case of failure, it can recover from checkpoint directory.
>
> My question is, can driver and executor have separate checkpoint location?
> I'm asking this because driver and executor might be deployed on different
> nodes. A shared checkpoint location will require ReadWriteMany access mode.
> Since I only have a storage class that supports ReadWriteOnce access mode
> I'm trying to find some workaround.
>
>
> In Spark Streaming Guide, it mentioned "Failed driver can be restarted from
> checkpoint information" and when executor failed, "Tasks and receivers
> restarted by Spark automatically, no config needed".
>
> Given this I tried only config checkpoint location for driver pod. It
> immediately failed with below exception:
>
> 2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id
> =
> b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
> f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR
> org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query
> baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
> f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error
> org.apache.spark.SparkException: Writing job aborted.
> at
>
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> ...
> at
>
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost
> task
> 0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1):
> java.io.IOException: mkdir of
> file:/opt/window-data/baseline_windower/state/0/0 failed
>
>
> So I tried giving separate checkpoint location to driver and executor:
>
> In spark application helm chart I have a checkpointlocation configuration:
>
> spec:
>sparkConf:
>  "spark.sql.streaming.checkpointLocation":
> "file:///opt/checkpoint-data"
>
> I created two checkpoint pvc and mount the volume for driver and executor
> pod:
>
>   volumes:
> - name: checkpoint-driver-volume
>   persistentVolumeClaim:
> claimName: checkpoint-driver-pvc
> - name: checkpoint-executor-volume
>   persistentVolumeClaim:
> claimName: checkpoint-executor-pvc
>
> driver:
> volumeMounts:
>   - name: checkpoint-driver-volume
> mountPath: "/opt/checkpoint-data"
> ...
> executor:
> volumeMounts:
>   - name: checkpoint-executor-volume
> mountPath: "/opt/checkpoint-data"
>
> After deployment it seems to be working. I tried restarted driver pod and
> it
> did recover from checkpoint directory. But I'm not sure if this is actually
> supported by design.
>
> Thanks,
> wzhan
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


spark on k8s - can driver and executor have separate checkpoint location?

2020-05-15 Thread wzhan
Hi guys,

I'm running spark applications on kubernetes. According to spark
documentation
https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
Spark needs distributed file system to store its checkpoint data so that in
case of failure, it can recover from checkpoint directory. 

My question is, can driver and executor have separate checkpoint location?
I'm asking this because driver and executor might be deployed on different
nodes. A shared checkpoint location will require ReadWriteMany access mode.
Since I only have a storage class that supports ReadWriteOnce access mode
I'm trying to find some workaround.


In Spark Streaming Guide, it mentioned "Failed driver can be restarted from
checkpoint information" and when executor failed, "Tasks and receivers
restarted by Spark automatically, no config needed".

Given this I tried only config checkpoint location for driver pod. It
immediately failed with below exception:

2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id =
b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR
org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query
baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error
org.apache.spark.SparkException: Writing job aborted.
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
...
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task
0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1):
java.io.IOException: mkdir of
file:/opt/window-data/baseline_windower/state/0/0 failed


So I tried giving separate checkpoint location to driver and executor:

In spark application helm chart I have a checkpointlocation configuration:

spec:
   sparkConf:
 "spark.sql.streaming.checkpointLocation": "file:///opt/checkpoint-data"

I created two checkpoint pvc and mount the volume for driver and executor
pod:

  volumes:
- name: checkpoint-driver-volume
  persistentVolumeClaim:
claimName: checkpoint-driver-pvc
- name: checkpoint-executor-volume
  persistentVolumeClaim:
claimName: checkpoint-executor-pvc

driver:
volumeMounts:
  - name: checkpoint-driver-volume
mountPath: "/opt/checkpoint-data"
...
executor:
volumeMounts:
  - name: checkpoint-executor-volume
mountPath: "/opt/checkpoint-data"

After deployment it seems to be working. I tried restarted driver pod and it
did recover from checkpoint directory. But I'm not sure if this is actually
supported by design.

Thanks,
wzhan



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org