yugan95 opened a new pull request, #56542: URL: https://github.com/apache/spark/pull/56542
### What changes were proposed in this pull request? This PR introduces **Distributed Map Join (DMJ)**, a new join strategy that avoids full probe-side shuffle by building a distributed hash table service for the build side. The probe side performs batched RPC lookups instead of shuffling. **How it works:** 1. Build side is hash-partitioned into a fixed number of shards and materialized as `HashedRelation` on executors, with configurable replica placement for fault tolerance. 2. A bloom filter is constructed over the build-side keys for probe-side pre-filtering. 3. Probe-side tasks buffer keys into batches, send async RPC lookups to the shard-holding executors, and stream matched rows back — no shuffle required on the probe side. The strategy is triggered explicitly via SQL hint: ```sql SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ * FROM probe_table JOIN build_table ON probe_table.key = build_table.key ``` **Key components:** | Component | Module | Description | |---|---|---| | `ShardManager` / `ShardManagerMaster` | core | Shard lifecycle, location tracking, replica placement | | `ShardLookupService` | core | Netty-based RPC layer for shard data transfer (pluggable via `spark.shard.service`) | | `ShardExchangeExec` | sql/core | Physical exchange operator that partitions build side into shards | | `DistributedMapJoinExec` | sql/core | Physical join operator with async batched RPC lookups | | `ShardQueryStageExec` | sql/core | AQE integration | | `ShardDistribution` / `ShardPartitioning` | sql/catalyst | Distribution and partitioning for shard exchange | | `BufferedShardRowMap` | sql/core | Off-heap key-value buffer for batching probe-side lookups | | `ResolveHints` (extended) | sql/catalyst | Hint parsing for `DISTMAPJOIN` | Supported join types: Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin. ### Why are the changes needed? Broadcast hash join is limited by driver memory and the executor broadcast threshold — build-side tables between ~200MB and 2GB often fall back to expensive shuffle joins, causing massive data movement on the probe side. **Production benchmark** (16k cores, 120TB memory cluster): | | Probe side | Build side | Strategy | Wall time | |---|---|---|---|---| | Before | ~5 PB | ~2 GB | Shuffle hash join (shuffle write + join) | ~5 hours | | After | ~5 PB | ~2 GB | Distributed map join (join only) | ~2 hours | **~60% wall-time reduction** by eliminating the probe-side shuffle entirely. This is similar in spirit to Hive's map join but distributed — no single-node bottleneck. ### Does this PR introduce _any_ user-facing change? Yes. Adds a new SQL hint `DISTMAPJOIN` and the following configs: | Config | Default | Description | |---|---|---| | `spark.shard.enabled` | `false` | Enable shard service infrastructure at launch | | `spark.sql.execution.distributedMapJoin.maxInFlightNum` | `8` | Max concurrent RPC lookup batches per task | | `spark.sql.execution.distributedMapJoin.maxBatchSize` | `1024` | Max probe-side keys per RPC batch | | `spark.sql.execution.distributedMapJoin.bloomFilterCapacity` | `5242880` | Expected distinct keys per shard for bloom filter sizing | | `spark.sql.execution.distributedMapJoin.exchangeTimeout` | `30min` | Timeout for building shard data | ### How was this patch tested? - `DistributedMapJoinSuite`: end-to-end tests on `local-cluster[2,1,512]` covering inner join, left/right outer join, left semi join, left anti join, with and without join conditions, null key handling, and multi-column join keys. - `DistributedMapJoinSuiteAE`: same test coverage with AQE enabled. - `ContextCleanerSuite`: extended to cover shard-set cleanup. - `CachedTableSuite`: extended with `shardSetCleaned` listener stub. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
