Ma77Ball opened a new issue, #5592: URL: https://github.com/apache/texera/issues/5592
### Task Summary `ReservoirSamplingOpExec` allocates a fixed-size reservoir `Array[Tuple]` of length `count` (the per-worker share of `k`). When a worker receives fewer tuples than `count` (the input stream is smaller than `k`, or upstream partitioning is skewed so a worker gets fewer than its share), `processTuple` fills only slots `0 until n` and leaves the trailing `count - n` slots `null`. `onFinish` then returns the whole array verbatim (`reservoir.iterator`), so the iterator yields `n` real tuples followed by `count - n` `null` entries. This is not currently user-visible: the worker's data-processor loop has a null-guard (`DataProcessor.scala`, around line 157: `if (outputTuple == null) return`) that silently skips any null output tuple before it is serialized or passed downstream, so the end-to-end result is correct and downstream operators never see the nulls. The issue is latent: the operator violates the implicit "do not emit null tuples" contract and relies on a distant global guard to clean up after it. If that guard is ever removed, narrowed, or bypassed (a new output path, a batching change, or a code path that inspects tuples before the guard), the nulls would leak into serialization or downstream operators and cause incorrect results or NullPointerExceptions. Proposed change: emit only the filled prefix in `onFinish`: ```scala override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator.take(n) ``` `take(n)` returns all `count` elements when `n >= count` (input >= k, behavior unchanged) and exactly the `n` filled slots when `n < count` (input < k, no null padding). This keeps the operator's output contract clean without depending on the engine's null guard. File: `common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala` (`onFinish`). Regression coverage: the `input size < k` case in `ReservoirSamplingOpExecSpec`, which fails against `reservoir.iterator` and passes with `reservoir.iterator.take(n)`. ### Task Type - [x] Refactor / Cleanup - [ ] DevOps / Deployment / CI - [ ] Testing / QA - [ ] Documentation - [ ] Performance - [ ] Other -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
