sigmod commented on pull request #32210:
URL: https://github.com/apache/spark/pull/32210#issuecomment-854380297


   > Sorry for the late update. Went through 
https://cs-people.bu.edu/mathan/reading-groups/papers-classics/join.pdf and 
https://www.db-book.com/db4/slide-dir/ch13.ppt, and I have a new proposal for 
this feature as followed.
   > 
   > At the high level, the feature is:
   > 
   > 1. Build the hash table (`HashedRelation`) on build side.
   > 2. If step 1 succeeds, do shuffled hash join with stream side.
   > 3. If step 1 fails (either due to OOM `SparkOutOfMemoryError`, or a 
configurable threshold, more details below)
   >    3.1. Add rest of rows from build side to a sorter 
(`UnsafeExternalRowSorter.insertRow`, e.g. the sorter called `buildSorter`).
   >    3.2. Process rows from streamed side. Probe hash table first to do 
shuffled hash join. If having matched rows, output and process next row. If no 
matched rows, add the row from stream side to another sorter (`streamedSorter`).
   
   Is 3.1 + 3.2 correct?  Let's say, if the build side have two rows `r1` and 
`r2`, with identical values on join key columns. If r1 is processed before OOM 
and inserted into the in memory table, while r2 is processed after the OOM.  
Now a probe side row `p`, with the same join key column values can only match 
`r1`.  `p` should match `r2` as well for correctness.  
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to