yugan95 opened a new pull request, #56542:
URL: https://github.com/apache/spark/pull/56542

   ### What changes were proposed in this pull request?
   
   This PR introduces **Distributed Map Join (DMJ)**, a new join strategy that 
avoids full probe-side shuffle by building a distributed hash table service for 
the build side. The probe side performs batched RPC lookups instead of 
shuffling.
   
   **How it works:**
   1. Build side is hash-partitioned into a fixed number of shards and 
materialized as `HashedRelation` on executors, with configurable replica 
placement for fault tolerance.
   2. A bloom filter is constructed over the build-side keys for probe-side 
pre-filtering.
   3. Probe-side tasks buffer keys into batches, send async RPC lookups to the 
shard-holding executors, and stream matched rows back — no shuffle required on 
the probe side.
   
   The strategy is triggered explicitly via SQL hint:
   ```sql
   SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ *
   FROM probe_table JOIN build_table ON probe_table.key = build_table.key
   ```
   
   **Key components:**
   
   | Component | Module | Description |
   |---|---|---|
   | `ShardManager` / `ShardManagerMaster` | core | Shard lifecycle, location 
tracking, replica placement |
   | `ShardLookupService` | core | Netty-based RPC layer for shard data 
transfer (pluggable via `spark.shard.service`) |
   | `ShardExchangeExec` | sql/core | Physical exchange operator that 
partitions build side into shards |
   | `DistributedMapJoinExec` | sql/core | Physical join operator with async 
batched RPC lookups |
   | `ShardQueryStageExec` | sql/core | AQE integration |
   | `ShardDistribution` / `ShardPartitioning` | sql/catalyst | Distribution 
and partitioning for shard exchange |
   | `BufferedShardRowMap` | sql/core | Off-heap key-value buffer for batching 
probe-side lookups |
   | `ResolveHints` (extended) | sql/catalyst | Hint parsing for `DISTMAPJOIN` |
   
   Supported join types: Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, 
ExistenceJoin.
   
   ### Why are the changes needed?
   
   Broadcast hash join is limited by driver memory and the executor broadcast 
threshold — build-side tables between ~200MB and 2GB often fall back to 
expensive shuffle joins, causing massive data movement on the probe side.
   
   **Production benchmark** (16k cores, 120TB memory cluster):
   
   | | Probe side | Build side | Strategy | Wall time |
   |---|---|---|---|---|
   | Before | ~5 PB | ~2 GB | Shuffle hash join (shuffle write + join) | ~5 
hours |
   | After | ~5 PB | ~2 GB | Distributed map join (join only) | ~2 hours |
   
   **~60% wall-time reduction** by eliminating the probe-side shuffle entirely.
   
   This is similar in spirit to Hive's map join but distributed — no 
single-node bottleneck.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. Adds a new SQL hint `DISTMAPJOIN` and the following configs:
   
   | Config | Default | Description |
   |---|---|---|
   | `spark.shard.enabled` | `false` | Enable shard service infrastructure at 
launch |
   | `spark.sql.execution.distributedMapJoin.maxInFlightNum` | `8` | Max 
concurrent RPC lookup batches per task |
   | `spark.sql.execution.distributedMapJoin.maxBatchSize` | `1024` | Max 
probe-side keys per RPC batch |
   | `spark.sql.execution.distributedMapJoin.bloomFilterCapacity` | `5242880` | 
Expected distinct keys per shard for bloom filter sizing |
   | `spark.sql.execution.distributedMapJoin.exchangeTimeout` | `30min` | 
Timeout for building shard data |
   
   ### How was this patch tested?
   
   - `DistributedMapJoinSuite`: end-to-end tests on `local-cluster[2,1,512]` 
covering inner join, left/right outer join, left semi join, left anti join, 
with and without join conditions, null key handling, and multi-column join keys.
   - `DistributedMapJoinSuiteAE`: same test coverage with AQE enabled.
   - `ContextCleanerSuite`: extended to cover shard-set cleanup.
   - `CachedTableSuite`: extended with `shardSetCleaned` listener stub.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to