Ma77Ball opened a new issue, #5409:
URL: https://github.com/apache/texera/issues/5409
### What happened?
`ReservoirSamplingOpExec` emits `null` "tuples" from `onFinish` when the
input stream contains **fewer tuples than k** (the per-worker reservoir size).
The reservoir is a fixed-size array allocated to `count` (this worker's
share of k):
```scala
//
common/workflow-operator/.../reservoirsampling/ReservoirSamplingOpExec.scala
override def open(): Unit = {
n = 0
reservoir = Array.ofDim(count) // fixed size = count
}
override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator
```
When `n < count`, `processTuple` only fills `reservoir(0 until n)`; the
trailing `count - n` slots stay `null`. `onFinish` returns
`reservoir.iterator`, so it yields those `null` slots into the operator's
output stream.
**Expected:** when fewer than k tuples arrive, the operator should emit
exactly the `n` buffered tuples and nothing else (reservoir sampling of a
stream shorter than k returns the whole stream).
### Severity and current impact
**Low, currently latent, not crashing.** The emitted nulls are silently
dropped downstream by a defensive guard in the worker:
```scala
// amber/.../worker/DataProcessor.scala (outputOneTuple)
if (outputTuple == null) return // line 157
```
This guard sits *before* `passTupleToDownstream`, storage writes, and
output-count statistics, so today there is no NPE, no corrupt output, and the
row count is correct. The bug still matters because:
- It violates the operator contract (`onFinish` should emit valid tuples)
and relies entirely on a defensive null-check elsewhere in the engine. Any
consumer that iterates the operator output without that guard (a refactor, a
different execution path, direct unit testing) would observe or mishandle the
nulls.
- It over-allocates and iterates dead reservoir slots.
### How to reproduce?
Construct the executor with `k = 5`, feed 3 tuples, then drain `onFinish`:
```
processTuple(t0); processTuple(t1); processTuple(t2)
onFinish(0).toList // => List(t0, t1, t2, null, null) <- two trailing
nulls
```
(The non-null prefix is correct; the trailing nulls are the bug.)
### Suggested fix
Emit only the filled slots, a one-line change in `onFinish`:
```scala
override def onFinish(port: Int): Iterator[TupleLike] =
reservoir.iterator.take(n)
// or: reservoir.iterator.filter(_ != null)
```
`take(n)` is preferable: it is O(n), needs no per-element null check, and
`n` already tracks how many slots are populated.
### Branch
main
### Commit Hash (Optional)
7deed35fd
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]