anishshri-db opened a new pull request, #42742:
URL: https://github.com/apache/spark/pull/42742

   ### What changes were proposed in this pull request?
   Allow block manager memory store iterator to handle thread interrupt and 
perform task completion gracefully
   
   
   ### Why are the changes needed?
   Currently the `putIteratorAsBytes` can remain hung even if thread interrupt 
is received on task cancellation leading to the task reaper killing the 
executor JVM eventually. The reason for this is that the interrupt is never 
processed within the while loop for the unroll block which leads to the task 
continuing running beyond the reaper timeout.
   
   Attached the logs here for a particular task/thread:
   ```
   10.1.121.105/app-20230824190614-0000/735/stderr.txt:55427:23/08/29 12:01:51 
INFO CoarseGrainedExecutorBackend: Got assigned task 222564684
   10.1.121.105/app-20230824190614-0000/735/stderr.txt:55494:23/08/29 12:01:51 
INFO Executor: Running task 6.0 in stage 900216.0 (TID 222564684)
   10.1.121.105/app-20230824190614-0000/735/stderr.txt:55983:23/08/29 12:03:22 
INFO Executor: Executor is trying to kill task 6.0 in stage 900216.0 (TID 
222564684), reason: another attempt succeeded
   ```
   
   ```
   10.1.121.105/app-20230824190614-0000/735/stderr.txt:55987:23/08/29 12:03:22 
INFO ShuffleMapTask: Trying to kill task 6.0 in stage 900216.0 (TID 222564684) 
with reason=another attempt succeeded and current stackTrace:
           net.jpountz.lz4.LZ4JNI.LZ4_compress_limitedOutput(Native Method)
           at 
net.jpountz.lz4.LZ4JNICompressor.compress(LZ4JNICompressor.java:36)
           at net.jpountz.lz4.LZ4Compressor.compress(LZ4Compressor.java:95)
           at 
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:208)
           at 
net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
           at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
           at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
           at 
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1460)
           at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
           at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
           at 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
           at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
           at 
org.apache.spark.storage.memory.SerializedValuesHolder.storeValue(MemoryStore.scala:728)
           at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
           at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
           at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1447)
           at 
org.apache.spark.storage.BlockManager$$Lambda$732/1315363341.apply(Unknown 
Source)
           at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1357)
           at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1421)
           at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1240)
           at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
           at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
           at 
org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
           at 
org.apache.spark.scheduler.ShuffleMapTask$$Lambda$1195/1841467537.apply(Unknown 
Source)
           at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
           at 
org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
           at 
org.apache.spark.scheduler.ShuffleMapTask$$Lambda$1050/445760861.apply(Unknown 
Source)
           at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
           at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
           at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153)
           at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122)
           at 
org.apache.spark.scheduler.Task$$Lambda$925/288368281.apply(Unknown Source)
           at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
           at org.apache.spark.scheduler.Task.run(Task.scala:94)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:819)
           at 
org.apache.spark.executor.Executor$TaskRunner$$Lambda$909/144086205.apply(Unknown
 Source)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:822)
           at 
org.apache.spark.executor.Executor$TaskRunner$$Lambda$793/213482103.apply$mcV$sp(Unknown
 Source)
           at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
           at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:678)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   
   ...
   10.1.121.105/app-20230824190614-0000/735/stderr.txt:58310:23/08/29 12:04:31 
WARN Executor: Killed task 222564684 is still running after 68908 ms
   10.1.121.105/app-20230824190614-0000/735/stderr.txt:58361:23/08/29 12:04:31 
WARN Executor: Thread dump from task 222564684:
   java.lang.System.identityHashCode(Native 
Method)java.io.ObjectOutputStream$HandleTable.hash(ObjectOutputStream.java:2360)java.io.ObjectOutputStream$HandleTable.lookup(ObjectOutputStream.java:2293)java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1116)java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)org.apache.spark.storage.memory.SerializedValuesHolder.storeValue(MemoryStore.scala:728)org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1447)org.apache.spark.storage.BlockManager$$Lambda$732/1315363341.apply(Unknown
 
Source)org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1357)org.apache.spark.storage.BlockManager.doPutItera
 
tor(BlockManager.scala:1421)org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1240)org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)org.apache.spark.rdd.RDD.iterator(RDD.scala:342)org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)org.apache.spark.rdd.RDD.iterator(RDD.scala:344)org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)org.apache.spark.rdd.RDD.iterator(RDD.scala:344)org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)org.apache.spark.rdd.RDD.iterator(RDD.scala:344)org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)org.apache.spark.scheduler.ShuffleMapTask$$Lambda$1195/1841467537.apply(U
 nknown 
Source)com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)org.apache.spark.scheduler.ShuffleMapTask$$Lambda$1050/445760861.apply(Unknown
 
Source)com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)org.apache.spark.scheduler.Task.doRunTask(Task.scala:153)org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122)org.apache.spark.scheduler.Task$$Lambda$925/288368281.apply(Unknown
 
Source)com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)org.apache.spark.scheduler.Task.run(Task.scala:94)org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:819)org.apache.spark.executor.Executor$TaskRunner$$Lambda$909/144086205.apply(U
 nknown 
Source)org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1657)org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:822)org.apache.spark.executor.Executor$TaskRunner$$Lambda$793/213482103.apply$mcV$sp(Unknown
 
Source)scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:678)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)java.lang.Thread.run(Thread.java:750)
   
10.1.121.105/app-20230824190614-0000/735/stderr.txt:58388:org.apache.spark.SparkException:
 Killing executor JVM because killed task 222564684 could not be stopped within 
60000 ms.
   10.1.121.105/app-20230824190614-0000/735/stderr.txt:58810:java.lang.Error: 
org.apache.spark.SparkException: Killing executor JVM because killed task 
222564684 could not be stopped within 60000 ms.
   10.1.121.105/app-20230824190614-0000/735/stderr.txt:58814:Caused by: 
org.apache.spark.SparkException: Killing executor JVM because killed task 
222564684 could not be stopped within 60000 ms.
   ```
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Ran MemoryStoreSuite
   ```
   [info] MemoryStoreSuite:
   [info] - reserve/release unroll memory (36 milliseconds)
   [info] - safely unroll blocks (70 milliseconds)
   [info] - safely unroll blocks through putIteratorAsValues (10 milliseconds)
   [info] - safely unroll blocks through putIteratorAsValues off-heap (21 
milliseconds)
   [info] - safely unroll blocks through putIteratorAsBytes (138 milliseconds)
   [info] - PartiallySerializedBlock.valuesIterator (6 milliseconds)
   [info] - PartiallySerializedBlock.finishWritingToStream (5 milliseconds)
   [info] - multiple unrolls by the same thread (8 milliseconds)
   [info] - lazily create a big ByteBuffer to avoid OOM if it cannot be put 
into MemoryStore (3 milliseconds)
   [info] - put a small ByteBuffer to MemoryStore (3 milliseconds)
   [info] - SPARK-22083: Release all locks in evictBlocksToFreeSpace (43 
milliseconds)
   [info] - put user-defined objects to MemoryStore and remove (5 milliseconds)
   [info] - put user-defined objects to MemoryStore and clear (4 milliseconds)
   [info] Run completed in 1 second, 587 milliseconds.
   [info] Total number of tests run: 13
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to