Copilot commented on code in PR #2684:
URL: https://github.com/apache/pekko/pull/2684#discussion_r2867057188


##########
stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala:
##########
@@ -525,6 +525,24 @@ object Sink {
   def completionStageSink[T, M](future: CompletionStage[Sink[T, M]]): Sink[T, 
CompletionStage[M]] =
     lazyCompletionStageSink[T, M](() => future)
 
+  /**
+   * Turn a `CompletionStage[Sink]` into a Sink that will consume the values 
of the source when the future completes
+   * successfully. If the `CompletionStage` is completed with a failure the 
stream is failed.
+   *
+   * Unlike [[completionStageSink]] and [[lazyCompletionStageSink]], this 
operator materializes the inner sink as
+   * soon as the future completes, even if no elements have arrived yet. This 
means empty streams complete normally
+   * rather than failing with [[NeverMaterializedException]]. Elements that 
arrive before the future completes
+   * are buffered.
+   *

Review Comment:
   The Javadoc claims the inner sink materializes “as soon as the future 
completes, even if no elements have arrived yet”, but `EagerFutureSink` 
currently materializes only after the first element is received (and buffered) 
or after upstream completes. Please align the docs with actual behavior, or 
update the stage to materialize immediately on future completion.
   ```suggestion
      * Compared to [[completionStageSink]] and [[lazyCompletionStageSink]], 
this operator eagerly waits for the
      * completion of the future while buffering any elements that arrive 
before that. The inner sink is materialized
      * when the future has completed and either the first element has been 
received (and buffered) or upstream
      * completes.
      *
      * This means that if upstream fails or downstream cancels before the 
future has completed, the inner sink is
      * never materialized and the materialized value fails with 
[[NeverMaterializedException]], even for empty
      * streams. Elements that arrive before the future completes are buffered 
until the inner sink is materialized.
      *
   ```



##########
stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala:
##########
@@ -661,3 +662,173 @@ import org.reactivestreams.Subscriber
     (stageLogic, promise.future)
   }
 }
+
+/**
+ * INTERNAL API
+ *
+ * Dedicated stage for [[pekko.stream.scaladsl.Sink.eagerFutureSink]] that 
materializes the inner sink
+ * when the future completes rather than waiting for the first element. Unlike 
[[LazySink]], this
+ * correctly handles empty streams by materializing the inner sink and 
completing it normally.
+ */
+@InternalApi final private[stream] class EagerFutureSink[T, M](future: 
Future[Sink[T, M]])
+    extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
+  val in = Inlet[T]("eagerFutureSink.in")
+  override def initialAttributes = DefaultAttributes.eagerFutureSink
+  override val shape: SinkShape[T] = SinkShape.of(in)
+
+  override def toString: String = "EagerFutureSink"
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[M]) = {
+    val promise = Promise[M]()
+    val stageLogic = new GraphStageLogic(shape) with InHandler {
+      private var sinkReady: OptionVal[Sink[T, M]] = OptionVal.none
+      private var bufferedElement: OptionVal[T] = OptionVal.none
+      private var upstreamClosed = false
+      private var upstreamFailed: OptionVal[Throwable] = OptionVal.none
+
+      override def preStart(): Unit = {
+        pull(in)
+        val cb: AsyncCallback[Try[Sink[T, M]]] =
+          getAsyncCallback {
+            case Success(sink) => onSinkReady(sink)
+            case Failure(e)    =>
+              promise.tryFailure(e)
+              failStage(e)
+          }
+        try {
+          future.onComplete(cb.invoke)(ExecutionContext.parasitic)
+        } catch {
+          case NonFatal(e) =>
+            promise.tryFailure(e)
+            failStage(e)
+        }
+      }
+
+      override def onPush(): Unit = {
+        sinkReady match {
+          case OptionVal.Some(sink) =>
+            switchTo(sink, OptionVal.Some(grab(in)))
+          case OptionVal.None =>
+            bufferedElement = OptionVal.Some(grab(in))
+        }
+      }
+
+      override def onUpstreamFinish(): Unit = {
+        upstreamClosed = true
+        sinkReady match {
+          case OptionVal.Some(sink) =>
+            switchTo(sink, OptionVal.none)
+          case OptionVal.None =>
+            setKeepGoing(true)
+        }
+      }
+
+      override def onUpstreamFailure(ex: Throwable): Unit = {
+        upstreamFailed = OptionVal.Some(ex)
+        upstreamClosed = true
+        sinkReady match {
+          case OptionVal.Some(_) =>
+            promise.tryFailure(ex)
+            failStage(ex)
+          case OptionVal.None =>
+            setKeepGoing(true)

Review Comment:
   `onUpstreamFailure` defers failing the stage/promise until the future sink 
resolves (by only setting `setKeepGoing(true)` when `sinkReady` is empty). This 
can keep the stream (and materialized `Future[M]`) hanging indefinitely if the 
future never completes, and it can also let a later future failure override the 
original upstream failure. Consider failing the stage/promise immediately on 
upstream failure (and ignoring any later future completion).
   ```suggestion
               // Fail the promise and stage immediately even if the sink is 
not yet ready.
               promise.tryFailure(ex)
               failStage(ex)
   ```



##########
stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala:
##########
@@ -661,3 +662,173 @@ import org.reactivestreams.Subscriber
     (stageLogic, promise.future)
   }
 }
+
+/**
+ * INTERNAL API
+ *
+ * Dedicated stage for [[pekko.stream.scaladsl.Sink.eagerFutureSink]] that 
materializes the inner sink
+ * when the future completes rather than waiting for the first element. Unlike 
[[LazySink]], this
+ * correctly handles empty streams by materializing the inner sink and 
completing it normally.
+ */
+@InternalApi final private[stream] class EagerFutureSink[T, M](future: 
Future[Sink[T, M]])
+    extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
+  val in = Inlet[T]("eagerFutureSink.in")
+  override def initialAttributes = DefaultAttributes.eagerFutureSink
+  override val shape: SinkShape[T] = SinkShape.of(in)
+
+  override def toString: String = "EagerFutureSink"
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[M]) = {
+    val promise = Promise[M]()
+    val stageLogic = new GraphStageLogic(shape) with InHandler {
+      private var sinkReady: OptionVal[Sink[T, M]] = OptionVal.none
+      private var bufferedElement: OptionVal[T] = OptionVal.none
+      private var upstreamClosed = false

Review Comment:
   `EagerFutureSink` doesn’t override `postStop` to fail the materialized 
`promise` when the stage is abruptly terminated before the future resolves / 
the inner sink is materialized. This can leave the returned `Future[M]` 
hanging. Consider adding a `postStop` hook (e.g., failing with 
`AbruptStageTerminationException`) similar to other promise-materializing 
stages in this module.



##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala:
##########
@@ -733,6 +733,22 @@ object Sink {
   def futureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]] =
     lazyFutureSink[T, M](() => future)
 
+  /**
+   * Turn a `Future[Sink]` into a Sink that will consume the values of the 
source when the future completes
+   * successfully. If the `Future` is completed with a failure the stream is 
failed.
+   *
+   * Unlike [[futureSink]] and [[lazyFutureSink]], this operator materializes 
the inner sink as soon as the future
+   * completes, even if no elements have arrived yet. This means empty streams 
complete normally rather than failing
+   * with [[NeverMaterializedException]]. Elements that arrive before the 
future completes are buffered.
+   *
+   * The materialized future value is completed with the materialized value of 
the future sink or failed if the
+   * future fails, upstream fails, or downstream cancels before the inner sink 
is materialized.

Review Comment:
   The scaladoc says the inner sink is materialized “as soon as the future 
completes, even if no elements have arrived yet”, but the current 
`EagerFutureSink` implementation only materializes once either an element has 
been received (buffered) or upstream has completed. Either adjust the 
implementation to actually materialize on future completion, or tighten the 
docs to match the implemented semantics.
   ```suggestion
      * Unlike [[futureSink]] and [[lazyFutureSink]], this operator buffers 
elements that arrive before the future
      * completes and then materializes the inner sink once the future has 
completed and either an element has arrived
      * (and been buffered) or the upstream completes.
      *
      * The materialized future value is completed with the materialized value 
of the future sink. It is failed if the
      * future fails, or with [[NeverMaterializedException]] if upstream fails 
or downstream cancels before the inner
      * sink has been materialized.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to