yugan95 commented on PR #56542:
URL: https://github.com/apache/spark/pull/56542#issuecomment-4726773565

   Thanks @sunchao for the thorough review! All issues except #8 are addressed 
in the latest push.
   
   **#1 outputOrdering** — Overrode to `Nil`. Async RPC callbacks break probe 
ordering.
   
   **#2 Shard transport auth** — `NettyShardLookupService` now takes 
`SecurityManager`, wires `AuthServerBootstrap`/`AuthClientBootstrap` when 
`spark.authenticate=true`, and passes `getRpcSSLOptions()` to 
`SparkTransportConf`. Mirrors `NettyBlockTransferService`.
   
   **#3 SinglePartition vs ShardDistribution(1)** — Added `case _: 
ShardDistribution => false` to `SinglePartition.satisfies0`.
   
   **#4 Task cancellation buffer leak** — Added `@volatile cancelled` flag. 
Completion listener drains `lookupQueue`, releases all buffers, closes 
`currentReader`, then frees the map. Callbacks check `cancelled` and release on 
both success/failure paths.
   
   **#5 Replica installation sync** — `installReplicaSet` uses `askSync` with 
`RpcTimeout` from exchange timeout (default 30min). Endpoint collects replica 
futures via `Future.sequence`, replies only after all complete.
   
   **#6 Driver exclusion** — Candidates filter out `DRIVER_IDENTIFIER` in 
non-local mode. Local mode keeps driver eligible as the only executor.
   
   **#7 hashJoinSupported** — `createDistributedMapJoin` now guarded with `if 
(hashJoinSupport)`, consistent with BHJ/SHJ.
   
   **#8 Response size bound** — Will address as a follow-up. Adding chunked 
responses requires protocol changes that deserve a separate review.
   
   **#9 Exchange cleanup** — Failed exchanges call `unpersist` in catch block. 
`unpersist` now also calls `master.removeShardSet` (new `RemoveShardSet` 
message) to clean `shardSetInfo`, `shardSetLocations`, and per-manager 
`_shards`.
   
   **#10 EOF flush throttling** — `flushLookup` now enforces `maxInFlightNum` 
limit. Added `checkValue(_ > 0)` for both `maxInFlightNum` and `maxBatchSize` 
configs.
   
   **#11 Request retain leak** — Added `reqMsg.release()` in `fetchBatch` catch 
block before `onBatchFetchFailure`.
   
   CI fixes (binding policies, MiMa, ContextCleanerSuite, Java checkstyle) also 
included.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to