He-Pin opened a new issue, #3134:
URL: https://github.com/apache/pekko/issues/3134
### Description
`UnfoldResourceSourceAsync` calls `close(resource)` in `postStop()` without
waiting for any in-flight `readData(resource)` `Future` to complete. This can
cause `close()` and `readData()` to run concurrently on the same resource.
### Location
`stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala:115-118`
```scala
override def postStop(): Unit = maybeResource match {
case OptionVal.Some(resource) => close(resource)
case _ => // do nothing
}
```
### Race condition
1. `onPull()` is called → `readData(resource)` returns an incomplete
`Future` → `readCallback` is registered (`UnfoldResourceSourceAsync.scala:105`)
2. Downstream cancels → the stage is stopped → `postStop()` is invoked →
`close(resource)` is called fire-and-forget
(`UnfoldResourceSourceAsync.scala:116`)
3. `close(resource)` and the still-running `readData(resource)` now operate
concurrently on the same resource `R`
### Consequences
- **Resource corruption**: if the user's `close` and `readData`
implementations are not safe to call concurrently (e.g., closing a database
cursor while reading from it), the resource may be corrupted or throw exceptions
- **Data loss**: `readData` may return valid data that is never delivered
because the stage has already stopped
- **Double-close**: when `readData` eventually completes with `None`, the
normal end-of-stream path (`UnfoldResourceSourceAsync.scala:82-87`) may call
`close(resource)` again, on top of the `close` already initiated by `postStop`
### Why `UnfoldAsync` is NOT affected
`UnfoldAsync`
(`stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala:87-122`) also
has an in-flight `Future` from `f(state)` when downstream cancels, but it has
no `postStop` and no external resource to clean up. The in-flight `Future`
simply completes and its callback is rejected by `getAsyncCallback` (stage
already stopped). There is no concurrent `close()` call. `UnfoldAsyncJava`
(line 129-179) has the same property.
### Why the sync `UnfoldResourceSource` is NOT affected
`UnfoldResourceSource`
(`stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala`)
calls `readData(resource)` synchronously on the interpreter thread.
`postStop()` also runs on the interpreter thread. There is no window for
concurrent execution.
### Suggested fix
Track the in-flight `readData` `Future` in a `var` field. In `postStop()`,
instead of calling `close(resource)` directly, chain `close(resource)` to run
after the in-flight read completes (or immediately if no read is in flight).
For example:
```scala
private var inFlightRead: Future[Option[T]] = _
override def onPull(): Unit = maybeResource match {
case OptionVal.Some(resource) =>
try {
val future = readData(resource)
inFlightRead = future
future.value match {
case Some(value) =>
inFlightRead = null
handle(value)
case None =>
future.onComplete { result =>
inFlightRead = null
readCallback(result)
}(parasitic)
}
} catch errorHandler
case OptionVal.None =>
}
override def postStop(): Unit = maybeResource match {
case OptionVal.Some(resource) =>
val pending = inFlightRead
if (pending != null) {
pending.onComplete(_ => close(resource))(parasitic)
} else {
close(resource)
}
case _ =>
}
```
The key constraint is: `close(resource)` must not be invoked until any
in-flight `readData(resource)` has completed.
### Affected operators
- `Source.unfoldResourceAsync` — directly affected
- `FlattenMerge` / `FlattenConcat` fast paths — not affected (no
`UnfoldResourceSourceAsync` fast path exists in these operators)
### References
- None - identified during internal code review
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]