Yicong-Huang commented on PR #54291:
URL: https://github.com/apache/spark/pull/54291#issuecomment-3895405655

   > That doesn't sound right because `sc.getLocalProperty` is thread-local. 
Other threads should not be able to write to it.
   
   First, regarding the value being null — `executionId` is a `Long` from 
`nextExecutionId`, so `executionId.toString` can never be null. And 
`setLocalProperty` only calls `remove(key)` when `value == null`, which isn't 
the case here. So the null doesn't come from the value being set.
   
   Good question — I initially thought the same, but it turns out 
`localProperties` is not always truly thread-local.
   
   `localProperties` is declared as `InheritableThreadLocal[Properties]`, so 
the `Properties` **reference** is per-thread. However, 
`AsyncRDDActions.takeAsync` breaks this isolation:
   
   ```scala
   // AsyncRDDActions.scala line 71 — no clone!
   val localProperties = self.context.getLocalProperties
   
   // line 111 — sets the SAME Properties object on a thread pool thread
   self.context.setLocalProperties(localProperties)
   ```
   
   `getLocalProperties` returns the raw `Properties` reference (not a copy). 
When `takeAsync` passes it to a thread pool thread via `setLocalProperties`, 
both the original thread and the pool thread end up sharing the **same** 
`Properties` instance. After that, any mutation from either thread (e.g., 
`setLocalProperty` → `Properties.setProperty`, or clearing via 
`Properties.remove`) is visible to both.
   
   This is inconsistent with every other cross-thread property propagation in 
the codebase — `withThreadLocalCaptured`, `DAGScheduler`, `JobScheduler`, 
`SparkOperation` all use `Utils.cloneProperties` before handing properties to 
another thread. `takeAsync` is the only one that doesn't.
   
   So the race scenario would be:
   
   1. Thread A (outer execution): sets `EXECUTION_ROOT_ID_KEY = "1"` (line 112)
   2. `takeAsync` shares the same `Properties` object with Thread B
   3. Thread A: line 111 reads `EXECUTION_ROOT_ID_KEY` → `"1"` (not null, skip 
setting)
   4. Thread B (outer execution's finally): clears `EXECUTION_ROOT_ID_KEY` → 
`null` (line 267, on the shared object)
   5. Thread A: line 115 reads `EXECUTION_ROOT_ID_KEY` → `null` → 
`NumberFormatException`
   
   I've pushed a separate commit to fix `takeAsync` by adding the missing 
`Utils.cloneProperties` call, consistent with the rest of the codebase. The 
original fix in this PR (reusing the value instead of re-reading) is still 
worthwhile as it removes the unnecessary redundancy regardless.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to