bersprockets commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1661030654

   While I have not yet looked at the solution provided here, I can confirm 
that the bug does indeed exist on the master branch.
   
   I was able to reproduce, but I had to have some debug statements for the 
timing to be right. My debug statements don't change logic or access any data 
structures (other than thread name), so it's not changing behavior, just 
timing. In more complex jobs with lots going on in the JVM, the timing might be 
right without debug statements.
   
   To get the SIGSEGV, the WindowExec iterator needs a `nextRow` value (called 
X in the PR description) whose internal fields are pointing to offheap memory 
held (indirectly) by `SortExec`. When the generated SMJ code calls 
`cleanUpResources` because the right side is empty, the offheap memory held 
(indirectly) by `SortExec` gets released.
   
   Later, when the `WindowExec` iterator calls `buffer.add(nextRow)`, the 
buffer tries to copy data from the cleaned up offheap page pointed at by 
`nextRow`. If you're lucky (which you usually are), the memory is still part of 
the JVM's process, so there's no crash. However, if that memory has been 
removed from the JVM's process, you get the SIGSEGV:
   ```
   bash-3.2$ bin/spark-sql --driver-memory 450m --conf 
spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=3m --master 
"local[2]" --conf spark.sql.shuffle.partitions=2
   ...
   spark-sql (default)> create or replace temp view leftside as
   select a, b, c, sum(c) over(partition by a order by b range between 2 
preceding and current row) as sumc
   from t1;
   Time taken: 1.692 seconds
   spark-sql (default)> create or replace temp view rightside as
   select a, b, c, sum(c) over(partition by a order by b range between 2 
preceding and current row) as sumc
   from t2;
   Time taken: 0.082 seconds
   spark-sql (default)> set spark.sql.adaptive.enabled=false;
   spark.sql.adaptive.enabled   false
   Time taken: 0.11 seconds, Fetched 1 row(s)
   spark-sql (default)> set spark.sql.autoBroadcastJoinThreshold=-1;
   spark.sql.autoBroadcastJoinThreshold -1
   Time taken: 0.018 seconds, Fetched 1 row(s)
   spark-sql (default)> create or replace temp view joined as
   select l.a, l.sumc, r.a as ra, r.sumc as rsumc
   from leftside l
   join rightside r
   on l.a = r.a;
   Time taken: 0.178 seconds
   spark-sql (default)> drop table if exists myoutput; create table myoutput 
stored as parquet as select * from joined;
   23/08/01 12:50:00 WARN ObjectStore: Failed to get database global_temp, 
returning NoSuchObjectException
   Time taken: 0.571 seconds
   23/08/01 12:50:00 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, 
since hive.security.authorization.manager is set to instance of 
HiveAuthorizerFactory.
   23/08/01 12:50:01 WARN HiveConf: HiveConf of name 
hive.internal.ss.authz.settings.applied.marker does not exist
   23/08/01 12:50:01 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout 
does not exist
   23/08/01 12:50:01 WARN HiveConf: HiveConf of name hive.stats.retries.wait 
does not exist
   23/08/01 12:50:01 WARN HiveMetaStore: Location: 
file:/Users/bruce/github/spark_fork_smj_issue/spark-warehouse/myoutput 
specified for non-external table:myoutput
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) &&& sorter 
cleanupResources
   Executor task launch worker for task 0.0 in stage 2.0 (TID 2) &&& sorter 
cleanupResources
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) >>> In 
processRows
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) ^^^ WindowExec 
fetching next partition
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) %%% Adding row 
to buffer
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) ^^^ WindowExec 
getting next from buffer
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) +++ Getting 
next streamed row
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) +++ Returning 
false
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) >>> Performing 
eager cleanup
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) &&& sorter 
cleanupResources
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) &&& sorter 
cleanupResources
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) >>> In 
processRows
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) ^^^ WindowExec 
fetching next partition
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) %%% Adding row 
to buffer
   #
   # A fatal error has been detected by the Java Runtime Environment:
   #
   #  SIGSEGV (0xb) at pc=0x000000010f4c19a0, pid=96472, tid=52259
   #
   # JRE version: Java(TM) SE Runtime Environment 18.9 (11.0.12+8) (build 
11.0.12+8-LTS-237)
   # Java VM: Java HotSpot(TM) 64-Bit Server VM 18.9 (11.0.12+8-LTS-237, mixed 
mode, tiered, compressed oops, g1 gc, bsd-amd64)
   # Problematic frame:
   # V  [libjvm.dylib+0x1329a0]  acl_CopyRight+0x29
   #
   ...
   ```
   The error report stack trace looks slightly different than that in the PR 
description, because I am running this on the master branch (which uses 
`WindowEvaluatorFactory`), but it dies in the same place 
(`ExternalAppendOnlyUnsafeRowArray.add`):
   ```
   Stack: [0x0000700010666000,0x0000700010766000],  sp=0x0000700010764308,  
free space=1016k
   Native frames: (J=compiled Java code, A=aot compiled Java code, 
j=interpreted, Vv=VM code, C=native code)
   V  [libjvm.dylib+0x1329a0]  acl_CopyRight+0x29
   J 4228  
jdk.internal.misc.Unsafe.copyMemory0(Ljava/lang/Object;JLjava/lang/Object;JJ)V 
[email protected] (0 bytes) @ 0x00000001249c88b4 
[0x00000001249c87c0+0x00000000000000f4]
   J 4226 c1 
jdk.internal.misc.Unsafe.copyMemory(Ljava/lang/Object;JLjava/lang/Object;JJ)V 
[email protected] (33 bytes) @ 0x000000011da6e92c 
[0x000000011da6e0e0+0x000000000000084c]
   j  sun.misc.Unsafe.copyMemory(Ljava/lang/Object;JLjava/lang/Object;JJ)V+11 
[email protected]
   j  
org.apache.spark.unsafe.Platform.copyMemory(Ljava/lang/Object;JLjava/lang/Object;JJ)V+34
   j  
org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy()Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;+37
   j  
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)V+48
   j  
org.apache.spark.sql.execution.window.WindowEvaluatorFactory$WindowPartitionEvaluator$$anon$1.fetchNextPartition()V+55
   j  
org.apache.spark.sql.execution.window.WindowEvaluatorFactory$WindowPartitionEvaluator$$anon$1.next()Lorg/apache/spark/sql/catalyst/InternalRow;+61
   j  
org.apache.spark.sql.execution.window.WindowEvaluatorFactory$WindowPartitionEvaluator$$anon$1.next()Ljava/lang/Object;+1
   j  
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext()V+7
   j  org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
   j  
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext()Z+4
   j  
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage7;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z+11
   j  
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext()V+412
   j  org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
   j  
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext()Z+4
   j  
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(Lscala/collection/Iterator;)V+3
   j  
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(Lorg/apache/spark/sql/execution/datasources/FileFormatDataWriter;Lscala/collection/Iterator;)Lorg/apache/spark/sql/execution/datasources/WriteTaskResult;+2
   ...
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to