Copilot commented on code in PR #2684:
URL: https://github.com/apache/pekko/pull/2684#discussion_r2867057188
##########
stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala:
##########
@@ -525,6 +525,24 @@ object Sink {
def completionStageSink[T, M](future: CompletionStage[Sink[T, M]]): Sink[T,
CompletionStage[M]] =
lazyCompletionStageSink[T, M](() => future)
+ /**
+ * Turn a `CompletionStage[Sink]` into a Sink that will consume the values
of the source when the future completes
+ * successfully. If the `CompletionStage` is completed with a failure the
stream is failed.
+ *
+ * Unlike [[completionStageSink]] and [[lazyCompletionStageSink]], this
operator materializes the inner sink as
+ * soon as the future completes, even if no elements have arrived yet. This
means empty streams complete normally
+ * rather than failing with [[NeverMaterializedException]]. Elements that
arrive before the future completes
+ * are buffered.
+ *
Review Comment:
The Javadoc claims the inner sink materializes “as soon as the future
completes, even if no elements have arrived yet”, but `EagerFutureSink`
currently materializes only after the first element is received (and buffered)
or after upstream completes. Please align the docs with actual behavior, or
update the stage to materialize immediately on future completion.
```suggestion
* Compared to [[completionStageSink]] and [[lazyCompletionStageSink]],
this operator eagerly waits for the
* completion of the future while buffering any elements that arrive
before that. The inner sink is materialized
* when the future has completed and either the first element has been
received (and buffered) or upstream
* completes.
*
* This means that if upstream fails or downstream cancels before the
future has completed, the inner sink is
* never materialized and the materialized value fails with
[[NeverMaterializedException]], even for empty
* streams. Elements that arrive before the future completes are buffered
until the inner sink is materialized.
*
```
##########
stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala:
##########
@@ -661,3 +662,173 @@ import org.reactivestreams.Subscriber
(stageLogic, promise.future)
}
}
+
+/**
+ * INTERNAL API
+ *
+ * Dedicated stage for [[pekko.stream.scaladsl.Sink.eagerFutureSink]] that
materializes the inner sink
+ * when the future completes rather than waiting for the first element. Unlike
[[LazySink]], this
+ * correctly handles empty streams by materializing the inner sink and
completing it normally.
+ */
+@InternalApi final private[stream] class EagerFutureSink[T, M](future:
Future[Sink[T, M]])
+ extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
+ val in = Inlet[T]("eagerFutureSink.in")
+ override def initialAttributes = DefaultAttributes.eagerFutureSink
+ override val shape: SinkShape[T] = SinkShape.of(in)
+
+ override def toString: String = "EagerFutureSink"
+
+ override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, Future[M]) = {
+ val promise = Promise[M]()
+ val stageLogic = new GraphStageLogic(shape) with InHandler {
+ private var sinkReady: OptionVal[Sink[T, M]] = OptionVal.none
+ private var bufferedElement: OptionVal[T] = OptionVal.none
+ private var upstreamClosed = false
+ private var upstreamFailed: OptionVal[Throwable] = OptionVal.none
+
+ override def preStart(): Unit = {
+ pull(in)
+ val cb: AsyncCallback[Try[Sink[T, M]]] =
+ getAsyncCallback {
+ case Success(sink) => onSinkReady(sink)
+ case Failure(e) =>
+ promise.tryFailure(e)
+ failStage(e)
+ }
+ try {
+ future.onComplete(cb.invoke)(ExecutionContext.parasitic)
+ } catch {
+ case NonFatal(e) =>
+ promise.tryFailure(e)
+ failStage(e)
+ }
+ }
+
+ override def onPush(): Unit = {
+ sinkReady match {
+ case OptionVal.Some(sink) =>
+ switchTo(sink, OptionVal.Some(grab(in)))
+ case OptionVal.None =>
+ bufferedElement = OptionVal.Some(grab(in))
+ }
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ upstreamClosed = true
+ sinkReady match {
+ case OptionVal.Some(sink) =>
+ switchTo(sink, OptionVal.none)
+ case OptionVal.None =>
+ setKeepGoing(true)
+ }
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ upstreamFailed = OptionVal.Some(ex)
+ upstreamClosed = true
+ sinkReady match {
+ case OptionVal.Some(_) =>
+ promise.tryFailure(ex)
+ failStage(ex)
+ case OptionVal.None =>
+ setKeepGoing(true)
Review Comment:
`onUpstreamFailure` defers failing the stage/promise until the future sink
resolves (by only setting `setKeepGoing(true)` when `sinkReady` is empty). This
can keep the stream (and materialized `Future[M]`) hanging indefinitely if the
future never completes, and it can also let a later future failure override the
original upstream failure. Consider failing the stage/promise immediately on
upstream failure (and ignoring any later future completion).
```suggestion
// Fail the promise and stage immediately even if the sink is
not yet ready.
promise.tryFailure(ex)
failStage(ex)
```
##########
stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala:
##########
@@ -661,3 +662,173 @@ import org.reactivestreams.Subscriber
(stageLogic, promise.future)
}
}
+
+/**
+ * INTERNAL API
+ *
+ * Dedicated stage for [[pekko.stream.scaladsl.Sink.eagerFutureSink]] that
materializes the inner sink
+ * when the future completes rather than waiting for the first element. Unlike
[[LazySink]], this
+ * correctly handles empty streams by materializing the inner sink and
completing it normally.
+ */
+@InternalApi final private[stream] class EagerFutureSink[T, M](future:
Future[Sink[T, M]])
+ extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
+ val in = Inlet[T]("eagerFutureSink.in")
+ override def initialAttributes = DefaultAttributes.eagerFutureSink
+ override val shape: SinkShape[T] = SinkShape.of(in)
+
+ override def toString: String = "EagerFutureSink"
+
+ override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, Future[M]) = {
+ val promise = Promise[M]()
+ val stageLogic = new GraphStageLogic(shape) with InHandler {
+ private var sinkReady: OptionVal[Sink[T, M]] = OptionVal.none
+ private var bufferedElement: OptionVal[T] = OptionVal.none
+ private var upstreamClosed = false
Review Comment:
`EagerFutureSink` doesn’t override `postStop` to fail the materialized
`promise` when the stage is abruptly terminated before the future resolves /
the inner sink is materialized. This can leave the returned `Future[M]`
hanging. Consider adding a `postStop` hook (e.g., failing with
`AbruptStageTerminationException`) similar to other promise-materializing
stages in this module.
##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala:
##########
@@ -733,6 +733,22 @@ object Sink {
def futureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]] =
lazyFutureSink[T, M](() => future)
+ /**
+ * Turn a `Future[Sink]` into a Sink that will consume the values of the
source when the future completes
+ * successfully. If the `Future` is completed with a failure the stream is
failed.
+ *
+ * Unlike [[futureSink]] and [[lazyFutureSink]], this operator materializes
the inner sink as soon as the future
+ * completes, even if no elements have arrived yet. This means empty streams
complete normally rather than failing
+ * with [[NeverMaterializedException]]. Elements that arrive before the
future completes are buffered.
+ *
+ * The materialized future value is completed with the materialized value of
the future sink or failed if the
+ * future fails, upstream fails, or downstream cancels before the inner sink
is materialized.
Review Comment:
The scaladoc says the inner sink is materialized “as soon as the future
completes, even if no elements have arrived yet”, but the current
`EagerFutureSink` implementation only materializes once either an element has
been received (buffered) or upstream has completed. Either adjust the
implementation to actually materialize on future completion, or tighten the
docs to match the implemented semantics.
```suggestion
* Unlike [[futureSink]] and [[lazyFutureSink]], this operator buffers
elements that arrive before the future
* completes and then materializes the inner sink once the future has
completed and either an element has arrived
* (and been buffered) or the upstream completes.
*
* The materialized future value is completed with the materialized value
of the future sink. It is failed if the
* future fails, or with [[NeverMaterializedException]] if upstream fails
or downstream cancels before the inner
* sink has been materialized.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]