He-Pin opened a new issue, #3134:
URL: https://github.com/apache/pekko/issues/3134

   ### Description
   
   `UnfoldResourceSourceAsync` calls `close(resource)` in `postStop()` without 
waiting for any in-flight `readData(resource)` `Future` to complete. This can 
cause `close()` and `readData()` to run concurrently on the same resource.
   
   ### Location
   
   
`stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala:115-118`
   
   ```scala
   override def postStop(): Unit = maybeResource match {
     case OptionVal.Some(resource) => close(resource)
     case _                        => // do nothing
   }
   ```
   
   ### Race condition
   
   1. `onPull()` is called → `readData(resource)` returns an incomplete 
`Future` → `readCallback` is registered (`UnfoldResourceSourceAsync.scala:105`)
   2. Downstream cancels → the stage is stopped → `postStop()` is invoked → 
`close(resource)` is called fire-and-forget 
(`UnfoldResourceSourceAsync.scala:116`)
   3. `close(resource)` and the still-running `readData(resource)` now operate 
concurrently on the same resource `R`
   
   ### Consequences
   
   - **Resource corruption**: if the user's `close` and `readData` 
implementations are not safe to call concurrently (e.g., closing a database 
cursor while reading from it), the resource may be corrupted or throw exceptions
   - **Data loss**: `readData` may return valid data that is never delivered 
because the stage has already stopped
   - **Double-close**: when `readData` eventually completes with `None`, the 
normal end-of-stream path (`UnfoldResourceSourceAsync.scala:82-87`) may call 
`close(resource)` again, on top of the `close` already initiated by `postStop`
   
   ### Why `UnfoldAsync` is NOT affected
   
   `UnfoldAsync` 
(`stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala:87-122`) also 
has an in-flight `Future` from `f(state)` when downstream cancels, but it has 
no `postStop` and no external resource to clean up. The in-flight `Future` 
simply completes and its callback is rejected by `getAsyncCallback` (stage 
already stopped). There is no concurrent `close()` call. `UnfoldAsyncJava` 
(line 129-179) has the same property.
   
   ### Why the sync `UnfoldResourceSource` is NOT affected
   
   `UnfoldResourceSource` 
(`stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala`)
 calls `readData(resource)` synchronously on the interpreter thread. 
`postStop()` also runs on the interpreter thread. There is no window for 
concurrent execution.
   
   ### Suggested fix
   
   Track the in-flight `readData` `Future` in a `var` field. In `postStop()`, 
instead of calling `close(resource)` directly, chain `close(resource)` to run 
after the in-flight read completes (or immediately if no read is in flight). 
For example:
   
   ```scala
   private var inFlightRead: Future[Option[T]] = _
   
   override def onPull(): Unit = maybeResource match {
     case OptionVal.Some(resource) =>
       try {
         val future = readData(resource)
         inFlightRead = future
         future.value match {
           case Some(value) =>
             inFlightRead = null
             handle(value)
           case None =>
             future.onComplete { result =>
               inFlightRead = null
               readCallback(result)
             }(parasitic)
         }
       } catch errorHandler
     case OptionVal.None =>
   }
   
   override def postStop(): Unit = maybeResource match {
     case OptionVal.Some(resource) =>
       val pending = inFlightRead
       if (pending != null) {
         pending.onComplete(_ => close(resource))(parasitic)
       } else {
         close(resource)
       }
     case _ =>
   }
   ```
   
   The key constraint is: `close(resource)` must not be invoked until any 
in-flight `readData(resource)` has completed.
   
   ### Affected operators
   
   - `Source.unfoldResourceAsync` — directly affected
   - `FlattenMerge` / `FlattenConcat` fast paths — not affected (no 
`UnfoldResourceSourceAsync` fast path exists in these operators)
   
   ### References
   
   - None - identified during internal code review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to