Ma77Ball opened a new issue, #5592:
URL: https://github.com/apache/texera/issues/5592

   ### Task Summary
   
   `ReservoirSamplingOpExec` allocates a fixed-size reservoir `Array[Tuple]` of 
length `count` (the per-worker share of `k`). When a worker receives fewer 
tuples than `count` (the input stream is smaller than `k`, or upstream 
partitioning is skewed so a worker gets fewer than its share), `processTuple` 
fills only slots `0 until n` and leaves the trailing `count - n` slots `null`. 
`onFinish` then returns the whole array verbatim (`reservoir.iterator`), so the 
iterator yields `n` real tuples followed by `count - n` `null` entries.
   
   This is not currently user-visible: the worker's data-processor loop has a 
null-guard (`DataProcessor.scala`, around line 157: `if (outputTuple == null) 
return`) that silently skips any null output tuple before it is serialized or 
passed downstream, so the end-to-end result is correct and downstream operators 
never see the nulls. The issue is latent: the operator violates the implicit 
"do not emit null tuples" contract and relies on a distant global guard to 
clean up after it. If that guard is ever removed, narrowed, or bypassed (a new 
output path, a batching change, or a code path that inspects tuples before the 
guard), the nulls would leak into serialization or downstream operators and 
cause incorrect results or NullPointerExceptions.
   
   Proposed change: emit only the filled prefix in `onFinish`:
   
   ```scala
   override def onFinish(port: Int): Iterator[TupleLike] = 
reservoir.iterator.take(n)
   ```
   
   `take(n)` returns all `count` elements when `n >= count` (input >= k, 
behavior unchanged) and exactly the `n` filled slots when `n < count` (input < 
k, no null padding). This keeps the operator's output contract clean without 
depending on the engine's null guard.
   
   File: 
`common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala`
 (`onFinish`). Regression coverage: the `input size < k` case in 
`ReservoirSamplingOpExecSpec`, which fails against `reservoir.iterator` and 
passes with `reservoir.iterator.take(n)`.
   
   ### Task Type
   
   - [x] Refactor / Cleanup
   - [ ] DevOps / Deployment / CI
   - [ ] Testing / QA
   - [ ] Documentation
   - [ ] Performance
   - [ ] Other


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to