attilapiros opened a new pull request, #36512:
URL: https://github.com/apache/spark/pull/36512
### What changes were proposed in this pull request?
Deregistering disk persisted local RDD blocks from the block manager in case
of IO related errors when all (fixed number of) retries are failed.
### Why are the changes needed?
In case of a disk corruption a disk persisted RDD block will lead to job
failure as the block registration is always leads to the same file. So even
when the task is rescheduled on a different executor the job will fail.
Example:
First failure (the block is locally available):
```
22/04/25 07:15:28 ERROR executor.Executor: Exception in task 17024.0 in
stage 12.0 (TID 51853)
java.io.StreamCorruptedException: invalid stream header: 00000000
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:401)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
at
org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
at
org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
at
org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:617)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:897)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
```
Then the task might be rescheduled on a different executor but as the block
is registered to the first block manager the error will be the same:
```
java.io.StreamCorruptedException: invalid stream header: 00000000
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:401)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
at
org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
at
org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
at
org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
at
org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:698)
at
org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:696)
at scala.Option.map(Option.scala:146)
at
org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:696)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:831)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:886)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
```
My idea is to retry the IO operations a few times and when all of them
failed deregistering the block and let the following task to recompute it.
This PR only targets only local blocks. In a follow up PR `getRemoteValues`
can be extended with the block removing.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
1) An existing unit test was extended.
2) Manually.
#### Manual testing
Start Spark:
```
$ ./bin/spark-shell --master "local-cluster[3,1,1200]" --conf
spark.serializer=org.apache.spark.serializer.JavaSerializer
```
Create a persisted RDD (here via a DF):
```
scala> val df = sc.parallelize(1 to 20, 4).toDF
...
scala> df.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
...
scala> df.show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 20|
+-----+
```
Now as the blocks are persisted let's corrupt one of the file. For this we
have to find the the directory where the blocks stored:
```
$ grep "DiskBlockManager: Created local directory"
work/app-20220511112820-0000/*/stdout
work/app-20220511112820-0000/0/stdout:22/05/11 11:28:21 INFO
DiskBlockManager: Created local directory at
/private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-0f4c0d32-8f12-447f-add3-5cfbd4a7c777/blockmgr-dde20b67-a824-4d92-9023-8fa902588a26
work/app-20220511112820-0000/1/stdout:22/05/11 11:28:21 INFO
DiskBlockManager: Created local directory at
/private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-05de3de7-60ca-4954-8baa-965da3c35ce5/blockmgr-71c559a6-f0e8-42a1-bf53-3bddb4a69618
work/app-20220511112820-0000/2/stdout:22/05/11 11:28:21 INFO
DiskBlockManager: Created local directory at
/private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c
```
Let's write something into one of the rdd file:
```
vim
/private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1
```
Use the DF/RDD one more time:
```
scala> df.show()
22/05/11 11:30:41 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 7)
(192.168.1.65 executor 2): java.io.StreamCorruptedException: invalid stream
header: 41ACED00
at
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:938)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:396)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:66)
at
org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:66)
at
org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:137)
at
org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:212)
at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:967)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1277)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1344)
...
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 20|
+-----+
```
Check the logs:
```
$ cat work/app-20220511112820-0000/2/stdout
...
22/05/11 11:30:41 INFO CoarseGrainedExecutorBackend: Got assigned task 7
22/05/11 11:30:41 INFO Executor: Running task 0.0 in stage 3.0 (TID 7)
...
22/05/11 11:30:41 INFO BlockManager: invalid stream header: 41ACED00.
BlockManagerId(2, 192.168.1.65, 63368, None) - blockId: rdd_4_1 retryCount: 0 -
blockDiskPath:
/private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1
22/05/11 11:30:41 INFO BlockManager: invalid stream header: 41ACED00.
BlockManagerId(2, 192.168.1.65, 63368, None) - blockId: rdd_4_1 retryCount: 1 -
blockDiskPath:
/private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1
22/05/11 11:30:41 INFO BlockManager: invalid stream header: 41ACED00.
BlockManagerId(2, 192.168.1.65, 63368, None) - blockId: rdd_4_1 retryCount: 2 -
blockDiskPath:
/private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1
22/05/11 11:30:41 INFO BlockManager: invalid stream header: 41ACED00.
BlockManagerId(2, 192.168.1.65, 63368, None) - blockId: rdd_4_1 retryCount: 3 -
blockDiskPath:
/private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1
22/05/11 11:30:41 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7)
java.io.StreamCorruptedException: invalid stream header: 41ACED00
at
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:938)
~[?:1.8.0_322]
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:396)
~[?:1.8.0_322]
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:66)
~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
...
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_322]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_322]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_322]
22/05/11 11:30:41 INFO CoarseGrainedExecutorBackend: Got assigned task 8
22/05/11 11:30:41 INFO Executor: Running task 0.1 in stage 3.0 (TID 8)
22/05/11 11:30:41 INFO Executor: Finished task 0.1 in stage 3.0 (TID 8).
1623 bytes result sent to driver
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]