gmethvin commented on code in PR #2684:
URL: https://github.com/apache/pekko/pull/2684#discussion_r2867147052
##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala:
##########
@@ -733,6 +733,22 @@ object Sink {
def futureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]] =
lazyFutureSink[T, M](() => future)
+ /**
+ * Turn a `Future[Sink]` into a Sink that will consume the values of the
source when the future completes
+ * successfully. If the `Future` is completed with a failure the stream is
failed.
+ *
+ * Unlike [[futureSink]] and [[lazyFutureSink]], this operator materializes
the inner sink as soon as the future
+ * completes, even if no elements have arrived yet. This means empty streams
complete normally rather than failing
+ * with [[NeverMaterializedException]]. Elements that arrive before the
future completes are buffered.
+ *
+ * The materialized future value is completed with the materialized value of
the future sink or failed if the
+ * future fails, upstream fails, or downstream cancels before the inner sink
is materialized.
Review Comment:
Good observation. Refactored `EagerFutureSink` to actually materialize the
inner sink immediately on future completion, consistent with `FutureFlow` and
`FutureFlattenSource`. Added a test that verifies the inner sink is
materialized even when no elements have arrived yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]