gmethvin commented on code in PR #2684:
URL: https://github.com/apache/pekko/pull/2684#discussion_r2867147052


##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala:
##########
@@ -733,6 +733,22 @@ object Sink {
   def futureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]] =
     lazyFutureSink[T, M](() => future)
 
+  /**
+   * Turn a `Future[Sink]` into a Sink that will consume the values of the 
source when the future completes
+   * successfully. If the `Future` is completed with a failure the stream is 
failed.
+   *
+   * Unlike [[futureSink]] and [[lazyFutureSink]], this operator materializes 
the inner sink as soon as the future
+   * completes, even if no elements have arrived yet. This means empty streams 
complete normally rather than failing
+   * with [[NeverMaterializedException]]. Elements that arrive before the 
future completes are buffered.
+   *
+   * The materialized future value is completed with the materialized value of 
the future sink or failed if the
+   * future fails, upstream fails, or downstream cancels before the inner sink 
is materialized.

Review Comment:
   Good observation. Refactored `EagerFutureSink` to actually materialize the 
inner sink immediately on future completion, consistent with `FutureFlow` and 
`FutureFlattenSource`. Added a test that verifies the inner sink is 
materialized even when no elements have arrived yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to